Skip to content

Commit b4f0fd3

Browse files
authored
refactor(query): prevent masking/row access policy name conflicts (#18937)
* feat(query): prevent masking/row access policy name conflicts * extract check and fetch_id from loop * delete loop * fix * remove extra get_pb * use ?? replace double unwrap * fix flaky test
1 parent 535e40a commit b4f0fd3

File tree

19 files changed

+247
-173
lines changed

19 files changed

+247
-173
lines changed

โ€ŽCargo.lockโ€Ž

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/meta/api/src/data_mask_api.rsโ€Ž

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,26 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_meta_app::data_mask::data_mask_name_ident;
1516
use databend_common_meta_app::data_mask::CreateDatamaskReply;
1617
use databend_common_meta_app::data_mask::CreateDatamaskReq;
1718
use databend_common_meta_app::data_mask::DataMaskId;
1819
use databend_common_meta_app::data_mask::DataMaskNameIdent;
1920
use databend_common_meta_app::data_mask::DatamaskMeta;
2021
use databend_common_meta_app::tenant::Tenant;
22+
use databend_common_meta_app::tenant_key::errors::ExistError;
2123
use databend_common_meta_types::MetaError;
2224
use databend_common_meta_types::SeqV;
2325

2426
use crate::errors::MaskingPolicyError;
25-
use crate::kv_app_error::KVAppError;
2627
use crate::meta_txn_error::MetaTxnError;
2728

2829
#[async_trait::async_trait]
2930
pub trait DatamaskApi: Send + Sync {
3031
async fn create_data_mask(
3132
&self,
3233
req: CreateDatamaskReq,
33-
) -> Result<CreateDatamaskReply, KVAppError>;
34+
) -> Result<Result<CreateDatamaskReply, ExistError<data_mask_name_ident::Resource>>, MetaError>;
3435

3536
/// On success, returns the dropped id and data mask.
3637
/// Returning None, means nothing is removed.

โ€Žsrc/meta/api/src/data_mask_api_impl.rsโ€Ž

Lines changed: 81 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use databend_common_meta_app::app_error::AppError;
15+
use databend_common_meta_app::data_mask::data_mask_name_ident;
1616
use databend_common_meta_app::data_mask::CreateDatamaskReply;
1717
use databend_common_meta_app::data_mask::CreateDatamaskReq;
1818
use databend_common_meta_app::data_mask::DataMaskId;
@@ -24,8 +24,10 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent;
2424
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
2525
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
2626
use databend_common_meta_app::id_generator::IdGenerator;
27+
use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent;
2728
use databend_common_meta_app::schema::CreateOption;
2829
use databend_common_meta_app::tenant::Tenant;
30+
use databend_common_meta_app::tenant_key::errors::ExistError;
2931
use databend_common_meta_app::KeyWithTenant;
3032
use databend_common_meta_kvapi::kvapi;
3133
use databend_common_meta_kvapi::kvapi::DirName;
@@ -38,7 +40,6 @@ use log::debug;
3840
use crate::data_mask_api::DatamaskApi;
3941
use crate::errors::MaskingPolicyError;
4042
use crate::fetch_id;
41-
use crate::kv_app_error::KVAppError;
4243
use crate::kv_pb_api::KVPbApi;
4344
use crate::meta_txn_error::MetaTxnError;
4445
use crate::txn_backoff::txn_backoff;
@@ -55,87 +56,93 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
5556
async fn create_data_mask(
5657
&self,
5758
req: CreateDatamaskReq,
58-
) -> Result<CreateDatamaskReply, KVAppError> {
59+
) -> Result<Result<CreateDatamaskReply, ExistError<data_mask_name_ident::Resource>>, MetaError>
60+
{
5961
debug!(req :? =(&req); "DatamaskApi: {}", func_name!());
6062

6163
let name_ident = &req.name;
6264

63-
let mut trials = txn_backoff(None, func_name!());
64-
let id = loop {
65-
trials.next().unwrap()?.await;
65+
let row_access_name_ident = RowAccessPolicyNameIdent::new(
66+
name_ident.tenant().clone(),
67+
name_ident.data_mask_name().to_string(),
68+
);
6669

67-
let mut txn = TxnRequest::default();
70+
let masking_policy_id = fetch_id(self, IdGenerator::data_mask_id()).await?;
6871

69-
let res = self.get_id_and_value(name_ident).await?;
70-
debug!(res :? = res, name_key :? =(name_ident); "create_data_mask");
71-
72-
let mut curr_seq = 0;
73-
74-
if let Some((seq_id, seq_meta)) = res {
75-
match req.create_option {
76-
CreateOption::Create => {
77-
return Err(AppError::DatamaskAlreadyExists(
78-
name_ident.exist_error(func_name!()),
79-
)
80-
.into());
81-
}
82-
CreateOption::CreateIfNotExists => {
83-
return Ok(CreateDatamaskReply { id: *seq_id.data });
84-
}
85-
CreateOption::CreateOrReplace => {
86-
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
87-
88-
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
89-
90-
curr_seq = seq_id.seq;
91-
}
92-
};
93-
}
72+
let mut txn = TxnRequest::default();
9473

95-
// Create data mask by inserting these record:
96-
// name -> id
97-
// id -> policy
98-
// data mask name -> data mask table id list
99-
100-
let id = fetch_id(self, IdGenerator::data_mask_id()).await?;
101-
102-
let id = DataMaskId::new(id);
103-
let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id);
104-
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());
105-
106-
debug!(
107-
id :? =(&id_ident),
108-
name_key :? =(name_ident);
109-
"new datamask id"
110-
);
111-
112-
{
113-
let meta: DatamaskMeta = req.data_mask_meta.clone();
114-
let id_list = MaskpolicyTableIdList::default();
115-
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
116-
txn.if_then.extend(vec![
117-
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
118-
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
119-
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
120-
txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list
121-
]);
122-
123-
let (succ, _responses) = send_txn(self, txn).await?;
124-
125-
debug!(
126-
name :? =(name_ident),
127-
id :? =(&id_ident),
128-
succ = succ;
129-
"create_data_mask"
130-
);
131-
132-
if succ {
133-
break id;
74+
let res = self.get_id_and_value(name_ident).await?;
75+
debug!(res :? = res, name_key :? =(name_ident); "create_data_mask");
76+
77+
let mut curr_seq = 0;
78+
79+
if let Some((seq_id, seq_meta)) = res {
80+
match req.create_option {
81+
CreateOption::Create => {
82+
return Ok(Err(
83+
name_ident.exist_error(format!("{} already exists", req.name))
84+
));
13485
}
135-
}
136-
};
86+
CreateOption::CreateIfNotExists => {
87+
return Ok(Ok(CreateDatamaskReply { id: *seq_id.data }));
88+
}
89+
CreateOption::CreateOrReplace => {
90+
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
13791

138-
Ok(CreateDatamaskReply { id: *id })
92+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
93+
94+
curr_seq = seq_id.seq;
95+
}
96+
};
97+
}
98+
99+
// Create data mask by inserting these record:
100+
// name -> id
101+
// id -> policy
102+
// data mask name -> data mask table id list
103+
104+
let id = DataMaskId::new(masking_policy_id);
105+
let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id);
106+
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());
107+
108+
debug!(
109+
id :? =(&id_ident),
110+
name_key :? =(name_ident);
111+
"new datamask id"
112+
);
113+
114+
{
115+
let meta: DatamaskMeta = req.data_mask_meta.clone();
116+
let id_list = MaskpolicyTableIdList::default();
117+
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
118+
txn.condition
119+
.push(txn_cond_eq_seq(&row_access_name_ident, 0));
120+
txn.if_then.extend(vec![
121+
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
122+
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
123+
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
124+
txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list
125+
]);
126+
}
127+
128+
let (succ, _responses) = send_txn(self, txn).await?;
129+
130+
debug!(
131+
name :? =(name_ident),
132+
id :? =(&id_ident),
133+
succ = succ;
134+
"create_data_mask"
135+
);
136+
137+
if succ {
138+
Ok(Ok(CreateDatamaskReply {
139+
id: masking_policy_id,
140+
}))
141+
} else {
142+
Ok(Err(
143+
name_ident.exist_error(format!("{} already exists", req.name))
144+
))
145+
}
139146
}
140147

141148
async fn drop_data_mask(
@@ -173,7 +180,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
173180
// Policy is in use - cannot drop
174181
if !table_policy_refs.is_empty() {
175182
return Ok(Err(MaskingPolicyError::policy_in_use(
176-
tenant.tenant_name().to_string(),
177183
name_ident.data_mask_name().to_string(),
178184
)));
179185
}

โ€Žsrc/meta/api/src/errors.rsโ€Ž

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub enum MaskingPolicyError {
4747
}
4848

4949
impl MaskingPolicyError {
50-
pub fn policy_in_use(_tenant: impl Into<String>, policy_name: impl Into<String>) -> Self {
50+
pub fn policy_in_use(policy_name: impl Into<String>) -> Self {
5151
Self::PolicyInUse {
5252
policy_name: policy_name.into(),
5353
}
@@ -72,7 +72,7 @@ pub enum RowAccessPolicyError {
7272
}
7373

7474
impl RowAccessPolicyError {
75-
pub fn policy_in_use(_tenant: impl Into<String>, policy_name: impl Into<String>) -> Self {
75+
pub fn policy_in_use(policy_name: impl Into<String>) -> Self {
7676
Self::PolicyInUse {
7777
policy_name: policy_name.into(),
7878
}

โ€Žsrc/meta/api/src/kv_fetch_util.rsโ€Ž

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ pub async fn fetch_id<T: kvapi::Key>(
160160
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
161161
generator: T,
162162
) -> Result<u64, MetaError> {
163+
// Each `upsert` bumps the seq atomically inside metasrv, therefore every caller
164+
// receives a unique, monotonically increasing id even when multiple sessions
165+
// fetch from the same generator concurrently.
163166
let res = kv_api
164167
.upsert_kv(UpsertKV::update(generator.to_string_key(), b""))
165168
.await?;

โ€Žsrc/meta/api/src/row_access_policy_api.rsโ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub trait RowAccessPolicyApi: Send + Sync {
3333
req: CreateRowAccessPolicyReq,
3434
) -> Result<
3535
Result<CreateRowAccessPolicyReply, ExistError<row_access_policy_name_ident::Resource>>,
36-
MetaTxnError,
36+
MetaError,
3737
>;
3838

3939
/// On success, returns the dropped id and row policy.

0 commit comments

Comments
ย (0)