Skip to content

Commit 9d22547

Browse files
ldanilekConvex, Inc.
authored andcommitted
lazily parse scheduled job args (#31053)
currently we parse udf args from Bytes -> ConvexArray whenever we read from the `_scheduled_jobs` table. This can be unnecessary, for example `SchedulerModel::complete` reads the job, parses it, changes the state, and replaces the document. In that flow we don't need to parse and re-serialize udf arguments. Only parse arguments when necessary: when returning a virtual table document, when running the function, or when reporting a system error to the logs. GitOrigin-RevId: bdfadf8ea2adaa9e1ec3d42fbcb60397314f9aed
1 parent e6b9f48 commit 9d22547

File tree

4 files changed

+70
-40
lines changed

4 files changed

+70
-40
lines changed

crates/application/src/scheduled_jobs/mod.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
536536
self.function_log.log_mutation_system_error(
537537
&error,
538538
path,
539-
job.udf_args,
539+
job.udf_args()?,
540540
identity,
541541
self.rt.monotonic_now(),
542542
caller,
@@ -585,7 +585,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
585585
)
586586
.into(),
587587
path,
588-
job.udf_args,
588+
job.udf_args()?,
589589
identity,
590590
self.rt.monotonic_now(),
591591
caller,
@@ -603,7 +603,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
603603
)
604604
.into(),
605605
path,
606-
job.udf_args,
606+
job.udf_args()?,
607607
identity,
608608
self.rt.monotonic_now(),
609609
caller,
@@ -636,12 +636,13 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
636636
let namespace = tx.table_mapping().tablet_namespace(job_id.tablet_id)?;
637637
let path = job.path.clone();
638638

639+
let udf_args = job.udf_args()?;
639640
let result = self
640641
.runner
641642
.run_mutation_no_udf_log(
642643
tx,
643644
PublicFunctionPath::Component(path.clone()),
644-
job.udf_args.clone(),
645+
udf_args.clone(),
645646
caller.allowed_visibility(),
646647
context.clone(),
647648
)
@@ -650,13 +651,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
650651
Ok(r) => r,
651652
Err(e) => {
652653
self.function_log.log_mutation_system_error(
653-
&e,
654-
path,
655-
job.udf_args.clone(),
656-
identity,
657-
start,
658-
caller,
659-
context,
654+
&e, path, udf_args, identity, start, caller, context,
660655
)?;
661656
return Err(e);
662657
},
@@ -751,7 +746,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
751746
.runner
752747
.run_action_no_udf_log(
753748
PublicFunctionPath::Component(path),
754-
job.udf_args,
749+
job.udf_args()?,
755750
identity,
756751
caller,
757752
usage_tracker.clone(),
@@ -800,7 +795,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
800795
self.function_log.log_action_system_error(
801796
&JsError::from_message(message).into(),
802797
path,
803-
job.udf_args.clone(),
798+
job.udf_args()?,
804799
identity.into(),
805800
self.rt.monotonic_now(),
806801
caller,

crates/model/src/scheduled_jobs/mod.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -219,18 +219,18 @@ impl<'a, RT: Runtime> SchedulerModel<'a, RT> {
219219

220220
let now: Timestamp = self.tx.runtime().generate_timestamp()?;
221221
let original_scheduled_ts: Timestamp = ts.as_system_time().try_into()?;
222-
let scheduled_job = ScheduledJob {
223-
path: path.clone(),
224-
udf_args: args.clone(),
225-
state: ScheduledJobState::Pending,
222+
let scheduled_job = ScheduledJob::new(
223+
path.clone(),
224+
args.clone(),
225+
ScheduledJobState::Pending,
226226
// Don't set next_ts in the past to avoid scheduler incorrectly logging
227227
// it is falling behind. We should keep `original_scheduled_ts` intact
228228
// since this is exposed to the developer via the virtual table.
229-
next_ts: Some(original_scheduled_ts.max(now)),
230-
completed_ts: None,
229+
Some(original_scheduled_ts.max(now)),
230+
None,
231231
original_scheduled_ts,
232-
attempts: ScheduledJobAttempts::default(),
233-
};
232+
ScheduledJobAttempts::default(),
233+
)?;
234234
let job = if let Some(parent_scheduled_job) = context.parent_scheduled_job {
235235
let table_mapping = self.tx.table_mapping();
236236
let parent_scheduled_job = parent_scheduled_job
@@ -245,15 +245,15 @@ impl<'a, RT: Runtime> SchedulerModel<'a, RT> {
245245
| ScheduledJobState::Success => scheduled_job,
246246
ScheduledJobState::Canceled => {
247247
let scheduled_ts = self.tx.begin_timestamp();
248-
ScheduledJob {
248+
ScheduledJob::new(
249249
path,
250-
udf_args: args,
251-
state: ScheduledJobState::Canceled,
252-
next_ts: None,
253-
completed_ts: Some(*scheduled_ts),
254-
original_scheduled_ts: *scheduled_ts,
255-
attempts: ScheduledJobAttempts::default(),
256-
}
250+
args,
251+
ScheduledJobState::Canceled,
252+
None,
253+
Some(*scheduled_ts),
254+
*scheduled_ts,
255+
ScheduledJobAttempts::default(),
256+
)?
257257
},
258258
}
259259
} else {

crates/model/src/scheduled_jobs/types.rs

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ pub struct ScheduledJob {
2727
pub path: CanonicalizedComponentFunctionPath,
2828
#[cfg_attr(
2929
any(test, feature = "testing"),
30-
proptest(strategy = "proptest::arbitrary::any_with::<ConvexArray>((0..4).into())")
30+
proptest(
31+
strategy = "proptest::arbitrary::any_with::<ConvexArray>((0..4).into()).\
32+
prop_map(args_to_bytes).prop_filter_map(\"invalid json\", |b| b.ok())"
33+
)
3134
)]
32-
pub udf_args: ConvexArray,
35+
pub udf_args_bytes: ByteBuf,
3336

3437
pub state: ScheduledJobState,
3538

@@ -47,11 +50,47 @@ pub struct ScheduledJob {
4750
pub attempts: ScheduledJobAttempts,
4851
}
4952

53+
fn args_to_bytes(args: ConvexArray) -> anyhow::Result<ByteBuf> {
54+
let args_json = JsonValue::from(args);
55+
let args_bytes = serde_json::to_vec(&args_json)?;
56+
Ok(ByteBuf::from(args_bytes))
57+
}
58+
59+
impl ScheduledJob {
60+
pub fn new(
61+
path: CanonicalizedComponentFunctionPath,
62+
udf_args: ConvexArray,
63+
state: ScheduledJobState,
64+
next_ts: Option<Timestamp>,
65+
completed_ts: Option<Timestamp>,
66+
original_scheduled_ts: Timestamp,
67+
attempts: ScheduledJobAttempts,
68+
) -> anyhow::Result<Self> {
69+
Ok(Self {
70+
path,
71+
udf_args_bytes: args_to_bytes(udf_args)?,
72+
state,
73+
next_ts,
74+
completed_ts,
75+
original_scheduled_ts,
76+
attempts,
77+
})
78+
}
79+
80+
pub fn udf_args(&self) -> anyhow::Result<ConvexArray> {
81+
let args_json: JsonValue = serde_json::from_slice(&self.udf_args_bytes)?;
82+
let args = args_json.try_into()?;
83+
Ok(args)
84+
}
85+
}
86+
5087
#[derive(Debug, Serialize, Deserialize)]
5188
#[serde(rename_all = "camelCase")]
5289
struct SerializedScheduledJob {
5390
component: Option<String>,
5491
udf_path: String,
92+
// Serialize the udf arguments as binary since we restrict what
93+
// field names can be used in a `Document`'s top-level object.
5594
udf_args: ByteBuf,
5695
state: SerializedScheduledJobState,
5796
next_ts: Option<i64>,
@@ -64,14 +103,10 @@ impl TryFrom<ScheduledJob> for SerializedScheduledJob {
64103
type Error = anyhow::Error;
65104

66105
fn try_from(job: ScheduledJob) -> anyhow::Result<Self> {
67-
// Serialize the udf arguments as binary since we restrict what
68-
// field names can be used in a `Document`'s top-level object.
69-
let udf_args_json = JsonValue::from(job.udf_args);
70-
let udf_args_bytes = serde_json::to_vec(&udf_args_json)?;
71106
Ok(SerializedScheduledJob {
72107
component: Some(String::from(job.path.component)),
73108
udf_path: String::from(job.path.udf_path),
74-
udf_args: ByteBuf::from(udf_args_bytes),
109+
udf_args: job.udf_args_bytes,
75110
state: job.state.try_into()?,
76111
next_ts: job.next_ts.map(|ts| ts.into()),
77112
completed_ts: job.completed_ts.map(|ts| ts.into()),
@@ -91,8 +126,7 @@ impl TryFrom<SerializedScheduledJob> for ScheduledJob {
91126
.transpose()?
92127
.unwrap_or_else(ComponentPath::root);
93128
let udf_path = value.udf_path.parse()?;
94-
let udf_args_json: JsonValue = serde_json::from_slice(&value.udf_args)?;
95-
let udf_args = udf_args_json.try_into()?;
129+
let udf_args_bytes = value.udf_args;
96130
let state = value.state.try_into()?;
97131
let next_ts = value.next_ts.map(|ts| ts.try_into()).transpose()?;
98132
let completed_ts = value.completed_ts.map(|ts| ts.try_into()).transpose()?;
@@ -114,7 +148,7 @@ impl TryFrom<SerializedScheduledJob> for ScheduledJob {
114148
component,
115149
udf_path,
116150
},
117-
udf_args,
151+
udf_args_bytes,
118152
state,
119153
next_ts,
120154
completed_ts,

crates/model/src/scheduled_jobs/virtual_table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,11 @@ impl VirtualSystemDocMapper for ScheduledJobsDocMapper {
6060

6161
let job: ParsedDocument<ScheduledJob> = doc.clone().try_into()?;
6262
let job: ScheduledJob = job.into_value();
63+
let udf_args = job.udf_args()?;
6364
let public_job = PublicScheduledJob {
6465
// TODO(ENG-6920) include component (job.path.component) in virtual table.
6566
name: job.path.udf_path,
66-
args: job.udf_args,
67+
args: udf_args,
6768
state: job.state,
6869
scheduled_time: timestamp_to_ms(job.original_scheduled_ts)?,
6970
completed_time: match job.completed_ts {

0 commit comments

Comments
 (0)