Skip to content

Commit 5d55724

Browse files
committed
refactor: try drop old udf
In eariler version, udf is serialize as json. In current version we can not drop/list these udfs. show user functions; 2001=>InvalidReply: source:(PbDecodeError: failed to decode Protobuf message: buffer underflow; when:(decode value of __fd_udfs/tn3ftqihs/plusp)) while list UDFs drop function IF EXISTS plusp; 2001=>InvalidReply: source:(PbDecodeError: failed to decode Protobuf message: buffer underflow; when:(decode value of __fd_udfs/tn3ftqihs/plusp)) So if drop udf return err, we directly drop the kv.
1 parent 00f4bd2 commit 5d55724

File tree

2 files changed

+45
-5
lines changed

2 files changed

+45
-5
lines changed

src/query/management/src/udf/udf_mgr.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_meta_kvapi::kvapi::DirName;
2727
use databend_common_meta_types::seq_value::SeqV;
2828
use databend_common_meta_types::MatchSeq;
2929
use databend_common_meta_types::MetaError;
30+
use databend_common_meta_types::UpsertKV;
3031
use databend_common_meta_types::With;
3132
use futures::TryStreamExt;
3233

@@ -155,16 +156,26 @@ impl UdfMgr {
155156
seq: MatchSeq,
156157
) -> Result<Option<SeqV<UserDefinedFunction>>, MetaError> {
157158
let key = UdfIdent::new(&self.tenant, udf_name);
158-
let req = UpsertPB::delete(key).with(seq);
159-
let res = self.kv_api.upsert_pb(&req).await?;
160-
161-
if res.is_changed() {
162-
Ok(res.prev)
159+
let req = UpsertPB::delete(key.clone()).with(seq);
160+
if let Ok(res) = self.kv_api.upsert_pb(&req).await {
161+
if res.is_changed() {
162+
Ok(res.prev)
163+
} else {
164+
Ok(None)
165+
}
163166
} else {
167+
self.try_drop_old_udf(&key).await?;
164168
Ok(None)
165169
}
166170
}
167171

172+
#[async_backtrace::framed]
173+
#[fastrace::trace]
174+
pub async fn try_drop_old_udf(&self, key: &UdfIdent) -> Result<(), MetaError> {
175+
let _res = self.kv_api.upsert_kv(UpsertKV::delete(key)).await?;
176+
Ok(())
177+
}
178+
168179
fn ensure_non_builtin(&self, name: &str) -> Result<(), UdfError> {
169180
if is_builtin_function(name) {
170181
return Err(UdfError::Exists {

src/query/management/tests/it/udf.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use databend_common_meta_embedded::MemMeta;
2828
use databend_common_meta_kvapi::kvapi::KVApi;
2929
use databend_common_meta_types::seq_value::SeqV;
3030
use databend_common_meta_types::MatchSeq;
31+
use databend_common_meta_types::Operation;
32+
use databend_common_meta_types::UpsertKV;
3133

3234
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
3335
async fn test_add_udf() -> Result<()> {
@@ -109,6 +111,33 @@ async fn test_add_udf() -> Result<()> {
109111
Ok(())
110112
}
111113

114+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
115+
async fn test_drop_old_udf() -> Result<()> {
116+
let (kv_api, udf_api) = new_udf_api().await?;
117+
118+
// lambda udf
119+
let udf = create_test_lambda_udf();
120+
let udf_key = format!("__fd_udfs/admin/{}", udf.name);
121+
122+
let v = serde_json::to_vec("test")?;
123+
let kv_api = kv_api.clone();
124+
let _upsert_kv = kv_api
125+
.upsert_kv(UpsertKV::new(
126+
&udf_key,
127+
MatchSeq::Exact(0),
128+
Operation::Update(v),
129+
None,
130+
))
131+
.await?;
132+
let err = udf_api.list_udf().await.is_err();
133+
assert!(err);
134+
135+
udf_api.drop_udf(&udf.name, MatchSeq::GE(1)).await?;
136+
let udfs = udf_api.list_udf().await?;
137+
assert_eq!(udfs, vec![]);
138+
Ok(())
139+
}
140+
112141
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
113142
async fn test_already_exists_add_udf() -> Result<()> {
114143
let (_, udf_api) = new_udf_api().await?;

0 commit comments

Comments
 (0)