Skip to content

Commit 870e534

Browse files
committed
delete loop
1 parent 13f59cd commit 870e534

File tree

3 files changed

+123
-124
lines changed

3 files changed

+123
-124
lines changed

src/meta/api/src/data_mask_api_impl.rs

Lines changed: 68 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use databend_common_meta_app::app_error::AppError;
16+
use databend_common_meta_app::app_error::TxnRetryMaxTimes;
1617
use databend_common_meta_app::data_mask::CreateDatamaskReply;
1718
use databend_common_meta_app::data_mask::CreateDatamaskReq;
1819
use databend_common_meta_app::data_mask::DataMaskId;
@@ -74,84 +75,79 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
7475

7576
let masking_policy_id = fetch_id(self, IdGenerator::data_mask_id()).await?;
7677

77-
let mut trials = txn_backoff(None, func_name!());
78-
loop {
79-
trials.next().unwrap()?.await;
78+
let mut txn = TxnRequest::default();
8079

81-
let mut txn = TxnRequest::default();
80+
let res = self.get_id_and_value(name_ident).await?;
81+
debug!(res :? = res, name_key :? =(name_ident); "create_data_mask");
8282

83-
let res = self.get_id_and_value(name_ident).await?;
84-
debug!(res :? = res, name_key :? =(name_ident); "create_data_mask");
85-
86-
let mut curr_seq = 0;
87-
88-
if let Some((seq_id, seq_meta)) = res {
89-
match req.create_option {
90-
CreateOption::Create => {
91-
return Err(AppError::DatamaskAlreadyExists(
92-
name_ident.exist_error(func_name!()),
93-
)
94-
.into());
95-
}
96-
CreateOption::CreateIfNotExists => {
97-
return Ok(CreateDatamaskReply { id: *seq_id.data });
98-
}
99-
CreateOption::CreateOrReplace => {
100-
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
101-
102-
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
103-
104-
curr_seq = seq_id.seq;
105-
}
106-
};
107-
}
83+
let mut curr_seq = 0;
10884

109-
// Create data mask by inserting these record:
110-
// name -> id
111-
// id -> policy
112-
// data mask name -> data mask table id list
113-
114-
let id = DataMaskId::new(masking_policy_id);
115-
let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id);
116-
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());
117-
118-
debug!(
119-
id :? =(&id_ident),
120-
name_key :? =(name_ident);
121-
"new datamask id"
122-
);
123-
124-
{
125-
let meta: DatamaskMeta = req.data_mask_meta.clone();
126-
let id_list = MaskpolicyTableIdList::default();
127-
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
128-
txn.condition
129-
.push(txn_cond_eq_seq(&row_access_name_ident, 0));
130-
txn.if_then.extend(vec![
131-
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
132-
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
133-
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
134-
txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list
135-
]);
136-
137-
let (succ, _responses) = send_txn(self, txn).await?;
138-
139-
debug!(
140-
name :? =(name_ident),
141-
id :? =(&id_ident),
142-
succ = succ;
143-
"create_data_mask"
144-
);
145-
146-
if succ {
147-
break;
85+
if let Some((seq_id, seq_meta)) = res {
86+
match req.create_option {
87+
CreateOption::Create => {
88+
return Err(AppError::DatamaskAlreadyExists(
89+
name_ident.exist_error(func_name!()),
90+
)
91+
.into());
14892
}
149-
}
93+
CreateOption::CreateIfNotExists => {
94+
return Ok(CreateDatamaskReply { id: *seq_id.data });
95+
}
96+
CreateOption::CreateOrReplace => {
97+
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
98+
99+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
100+
101+
curr_seq = seq_id.seq;
102+
}
103+
};
104+
}
105+
106+
// Create data mask by inserting these record:
107+
// name -> id
108+
// id -> policy
109+
// data mask name -> data mask table id list
110+
111+
let id = DataMaskId::new(masking_policy_id);
112+
let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id);
113+
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());
114+
115+
debug!(
116+
id :? =(&id_ident),
117+
name_key :? =(name_ident);
118+
"new datamask id"
119+
);
120+
121+
{
122+
let meta: DatamaskMeta = req.data_mask_meta.clone();
123+
let id_list = MaskpolicyTableIdList::default();
124+
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
125+
txn.condition
126+
.push(txn_cond_eq_seq(&row_access_name_ident, 0));
127+
txn.if_then.extend(vec![
128+
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
129+
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
130+
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
131+
txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list
132+
]);
150133
}
151134

152-
Ok(CreateDatamaskReply {
153-
id: masking_policy_id,
154-
})
135+
let (succ, _responses) = send_txn(self, txn).await?;
136+
137+
debug!(
138+
name :? =(name_ident),
139+
id :? =(&id_ident),
140+
succ = succ;
141+
"create_data_mask"
142+
);
143+
144+
if succ {
145+
Ok(CreateDatamaskReply {
146+
id: masking_policy_id,
147+
})
148+
} else {
149+
Err(KVAppError::from(TxnRetryMaxTimes::new(func_name!(), 1)))
150+
}
155151
}
156152

157153
async fn drop_data_mask(

src/meta/api/src/kv_fetch_util.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ pub async fn fetch_id<T: kvapi::Key>(
160160
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
161161
generator: T,
162162
) -> Result<u64, MetaError> {
163+
// Each `upsert` bumps the seq atomically inside metasrv, therefore every caller
164+
// receives a unique, monotonically increasing id even when multiple sessions
165+
// fetch from the same generator concurrently.
163166
let res = kv_api
164167
.upsert_kv(UpsertKV::update(generator.to_string_key(), b""))
165168
.await?;

src/meta/api/src/row_access_policy_api_impl.rs

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_meta_app::app_error::TxnRetryMaxTimes;
1516
use databend_common_meta_app::data_mask::DataMaskNameIdent;
1617
use databend_common_meta_app::id_generator::IdGenerator;
1718
use databend_common_meta_app::row_access_policy::row_access_policy_name_ident;
@@ -36,6 +37,7 @@ use log::debug;
3637

3738
use crate::errors::RowAccessPolicyError;
3839
use crate::fetch_id;
40+
use crate::kv_app_error::KVAppError;
3941
use crate::kv_pb_api::KVPbApi;
4042
use crate::meta_txn_error::MetaTxnError;
4143
use crate::row_access_policy_api::RowAccessPolicyApi;
@@ -73,68 +75,66 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
7375

7476
let row_access_id = fetch_id(self, IdGenerator::row_access_id()).await?;
7577
let policy_id = RowAccessPolicyId::new(row_access_id);
76-
let mut trials = txn_backoff(None, func_name!());
77-
loop {
78-
trials.next().unwrap()?.await;
79-
80-
let mut txn = TxnRequest::default();
78+
let mut txn = TxnRequest::default();
8179

82-
let res = self.get_id_and_value(name_ident).await?;
83-
debug!(res :? = res, name_key :? =(name_ident); "create_row_access");
80+
let res = self.get_id_and_value(name_ident).await?;
81+
debug!(res :? = res, name_key :? =(name_ident); "create_row_access");
8482

85-
let mut curr_seq = 0;
83+
let mut curr_seq = 0;
8684

87-
if let Some((seq_id, seq_meta)) = res {
88-
if req.can_replace {
89-
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
85+
if let Some((seq_id, seq_meta)) = res {
86+
if req.can_replace {
87+
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
9088

91-
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
89+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
9290

93-
// TODO(eason): need to remove row policy from table meta
91+
// TODO(eason): need to remove row policy from table meta
9492

95-
curr_seq = seq_id.seq;
96-
} else {
97-
return Ok(Err(name_ident.exist_error(func_name!())));
98-
}
93+
curr_seq = seq_id.seq;
94+
} else {
95+
return Ok(Err(name_ident.exist_error(func_name!())));
9996
}
97+
}
10098

101-
// Create row policy by inserting these record:
102-
// name -> id
103-
// id -> policy
104-
105-
let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), policy_id);
106-
107-
debug!(
108-
id :? =(&id_ident),
109-
name_key :? =(name_ident);
110-
"new RowAccessPolicy id"
111-
);
112-
113-
{
114-
let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone();
115-
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
116-
txn.condition.push(txn_cond_eq_seq(&mask_name_ident, 0));
117-
txn.if_then.extend(vec![
118-
txn_op_put_pb(name_ident, &policy_id, None)?, // name -> policy_id
119-
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
120-
]);
121-
122-
let (succ, _responses) = send_txn(self, txn).await?;
123-
124-
debug!(
125-
name :? =(name_ident),
126-
id :? =(&id_ident),
127-
succ = succ;
128-
"create_row_access"
129-
);
130-
131-
if succ {
132-
break;
133-
}
134-
}
99+
// Create row policy by inserting these record:
100+
// name -> id
101+
// id -> policy
102+
103+
let id_ident = RowAccessPolicyIdIdent::new_generic(name_ident.tenant(), policy_id);
104+
105+
debug!(
106+
id :? =(&id_ident),
107+
name_key :? =(name_ident);
108+
"new RowAccessPolicy id"
109+
);
110+
111+
{
112+
let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone();
113+
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
114+
txn.condition.push(txn_cond_eq_seq(&mask_name_ident, 0));
115+
txn.if_then.extend(vec![
116+
txn_op_put_pb(name_ident, &policy_id, None)?, // name -> policy_id
117+
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
118+
]);
135119
}
136120

137-
Ok(Ok(CreateRowAccessPolicyReply { id: row_access_id }))
121+
let (succ, _responses) = send_txn(self, txn).await?;
122+
123+
debug!(
124+
name :? =(name_ident),
125+
id :? =(&id_ident),
126+
succ = succ;
127+
"create_row_access"
128+
);
129+
130+
if succ {
131+
Ok(Ok(CreateRowAccessPolicyReply { id: row_access_id }))
132+
} else {
133+
Err(MetaTxnError::TxnRetryMaxTimes(TxnRetryMaxTimes::new(
134+
func_name!(),
135+
1,
136+
)))
137+
}
138138
}
139139

140140
async fn drop_row_access_policy(

0 commit comments

Comments
 (0)