Skip to content

Commit 61ec13b

Browse files
emmaling27Convex, Inc.
authored andcommitted
Double write scheduled job args and read from the scheduled job args table (#43550)
Double writes scheduled job arguments to the `_scheduled_jobs` and `_scheduled_job_args` table. Adds a join to read arguments from the `_scheduled_job_args` table first, falling back on the `_scheduled_jobs` args if there is no `args_id` on the metadata. GitOrigin-RevId: 06b2d5a98422f2a747c4764bac7f345dc2018f53
1 parent 7490578 commit 61ec13b

File tree

7 files changed

+293
-77
lines changed

7 files changed

+293
-77
lines changed

crates/application/src/scheduled_jobs/mod.rs

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::{
1111
},
1212
};
1313

14+
use anyhow::Context;
1415
use common::{
1516
backoff::Backoff,
1617
components::{
@@ -83,6 +84,7 @@ use model::{
8384
scheduled_jobs::{
8485
types::{
8586
ScheduledJob,
87+
ScheduledJobMetadata,
8688
ScheduledJobState,
8789
},
8890
SchedulerModel,
@@ -345,14 +347,13 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
345347
) -> anyhow::Result<Option<Timestamp>> {
346348
let now = self.context.rt.generate_timestamp()?;
347349
let mut job_stream = self.context.stream_jobs_to_run(tx);
348-
while let Some(job) = job_stream.try_next().await? {
349-
let (job_id, job) = job.clone().into_id_and_value();
350+
while let Some((job_id, job)) = job_stream.try_next().await? {
350351
if self.running_job_ids.contains(&job_id) {
351352
continue;
352353
}
353354
let next_ts = job
354355
.next_ts
355-
.ok_or_else(|| anyhow::anyhow!("Could not get next_ts to run scheduled job at"))?;
356+
.context("Could not get next_ts to run scheduled job at")?;
356357
// If we can't execute the job return the job's target timestamp. If we're
357358
// caught up, we can sleep until the timestamp. If we're behind and
358359
// at our concurrency limit, we can use the timestamp to log how far
@@ -399,7 +400,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
399400
}
400401

401402
impl<RT: Runtime> ScheduledJobContext<RT> {
402-
#[try_stream(boxed, ok = ParsedDocument<ScheduledJob>, error = anyhow::Error)]
403+
#[try_stream(boxed, ok = (ResolvedDocumentId, ScheduledJob), error = anyhow::Error)]
403404
async fn stream_jobs_to_run<'a>(&'a self, tx: &'a mut Transaction<RT>) {
404405
let namespaces: Vec<_> = tx
405406
.table_mapping()
@@ -423,21 +424,30 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
423424
for namespace in namespaces {
424425
let mut query = ResolvedQuery::new(tx, namespace, index_query.clone())?;
425426
if let Some(doc) = query.next(tx, None).await? {
426-
let job: ParsedDocument<ScheduledJob> = doc.parse()?;
427-
let next_ts = job.next_ts.ok_or_else(|| {
428-
anyhow::anyhow!("Could not get next_ts to run scheduled job {}", job.id())
427+
let job_metadata: ParsedDocument<ScheduledJobMetadata> = doc.parse()?;
428+
let job_metadata_id = job_metadata.id();
429+
let next_ts = job_metadata.next_ts.ok_or_else(|| {
430+
anyhow::anyhow!(
431+
"Could not get next_ts to run scheduled job {}",
432+
job_metadata.id()
433+
)
429434
})?;
430-
queries.insert((next_ts, namespace), (job, query));
435+
let mut model = SchedulerModel::new(tx, namespace);
436+
let job = model.scheduled_job_from_metadata(job_metadata).await?;
437+
queries.insert((next_ts, namespace), ((job_metadata_id, job), query));
431438
}
432439
}
433440
while let Some(((_min_next_ts, namespace), (min_job, mut query))) = queries.pop_first() {
434441
yield min_job;
435442
if let Some(doc) = query.next(tx, None).await? {
436-
let job: ParsedDocument<ScheduledJob> = doc.parse()?;
437-
let next_ts = job.next_ts.ok_or_else(|| {
438-
anyhow::anyhow!("Could not get next_ts to run scheduled job {}", job.id())
443+
let job_metadata: ParsedDocument<ScheduledJobMetadata> = doc.parse()?;
444+
let job_metadata_id = job_metadata.id();
445+
let next_ts = job_metadata.next_ts.with_context(|| {
446+
format!("Could not get next_ts to run scheduled job {job_metadata_id}",)
439447
})?;
440-
queries.insert((next_ts, namespace), (job, query));
448+
let mut model = SchedulerModel::new(tx, namespace);
449+
let job = model.scheduled_job_from_metadata(job_metadata).await?;
450+
queries.insert((next_ts, namespace), ((job_metadata_id, job), query));
441451
}
442452
}
443453
}
@@ -469,11 +479,11 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
469479

470480
async fn schedule_retry(
471481
&self,
472-
mut job: ScheduledJob,
482+
job: ScheduledJob,
473483
job_id: ResolvedDocumentId,
474484
mut system_error: anyhow::Error,
475485
) -> anyhow::Result<()> {
476-
let (success, mut tx) = self
486+
let (success, mut tx, mut job) = self
477487
.new_transaction_for_job_state(job_id, &job, FunctionUsageTracker::new())
478488
.await?;
479489
if !success {
@@ -519,7 +529,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
519529
mutation_retry_count: usize,
520530
) -> anyhow::Result<()> {
521531
let usage_tracker = FunctionUsageTracker::new();
522-
let (success, mut tx) = self
532+
let (success, mut tx, _metadata) = self
523533
.new_transaction_for_job_state(job_id, &job, usage_tracker.clone())
524534
.await?;
525535
if !success {
@@ -739,7 +749,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
739749
// UDF failed due to developer error. It is not safe to commit the
740750
// transaction it executed in. We should remove the job in a new
741751
// transaction.
742-
let (success, mut tx) = self
752+
let (success, mut tx, _metadata) = self
743753
.new_transaction_for_job_state(job_id, &job, usage_tracker.clone())
744754
.await?;
745755
if !success {
@@ -782,7 +792,9 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
782792
usage_tracker: FunctionUsageTracker,
783793
) -> anyhow::Result<()> {
784794
let identity = tx.identity().clone();
785-
let mut tx = self.database.begin(identity.clone()).await?;
795+
let (_success, mut tx, metadata) = self
796+
.new_transaction_for_job_state(job_id, &job, usage_tracker.clone())
797+
.await?;
786798
let namespace = tx.table_mapping().tablet_namespace(job_id.tablet_id)?;
787799
match job.state {
788800
ScheduledJobState::Pending => {
@@ -792,7 +804,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
792804
sentry::configure_scope(|scope| context.add_sentry_tags(scope));
793805

794806
// Set state to in progress
795-
let mut updated_job = job.clone();
807+
let mut updated_job = metadata.clone();
796808
updated_job.state = ScheduledJobState::InProgress {
797809
request_id: Some(context.request_id.clone()),
798810
execution_id: Some(context.execution_id),
@@ -895,32 +907,61 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
895907
job_id: ResolvedDocumentId,
896908
expected_state: &ScheduledJob,
897909
usage_tracker: FunctionUsageTracker,
898-
) -> anyhow::Result<(bool, Transaction<RT>)> {
910+
) -> anyhow::Result<(bool, Transaction<RT>, ScheduledJobMetadata)> {
911+
let mut tx = self
912+
.database
913+
.begin_with_usage(Identity::Unknown(None), usage_tracker)
914+
.await?;
915+
// Verify that the scheduled job has not changed.
916+
let metadata = tx
917+
.get(job_id)
918+
.await?
919+
.map(ParseDocument::<ScheduledJobMetadata>::parse)
920+
.transpose()?
921+
.map(|j| j.into_value())
922+
.with_context(|| {
923+
format!("Missing scheduled jobs metadata document with id {job_id}")
924+
})?;
925+
Ok((expected_state.matches_metadata(&metadata), tx, metadata))
926+
}
927+
928+
// FIXME: Combine with new_transaction_for_job_state or remove one. They do
929+
// basically the same thing, just one takes `ScheduledJobMetadata` and the
930+
// other takes `ScheduledJob`.
931+
async fn new_transaction_for_job_metadata(
932+
&self,
933+
job_id: ResolvedDocumentId,
934+
expected_state: &ScheduledJobMetadata,
935+
usage_tracker: FunctionUsageTracker,
936+
) -> anyhow::Result<(bool, Transaction<RT>, ScheduledJobMetadata)> {
899937
let mut tx = self
900938
.database
901939
.begin_with_usage(Identity::Unknown(None), usage_tracker)
902940
.await?;
903941
// Verify that the scheduled job has not changed.
904-
let new_job = tx
942+
let metadata = tx
905943
.get(job_id)
906944
.await?
907-
.map(ParseDocument::<ScheduledJob>::parse)
945+
.map(ParseDocument::<ScheduledJobMetadata>::parse)
908946
.transpose()?
909-
.map(|j| j.into_value());
910-
Ok((new_job.as_ref() == Some(expected_state), tx))
947+
.map(|j| j.into_value())
948+
.with_context(|| {
949+
format!("Missing scheduled jobs metadata document with id {job_id}")
950+
})?;
951+
Ok((expected_state == &metadata, tx, metadata))
911952
}
912953

913954
// Completes an action in separate transaction. Returns false if the action
914955
// state has changed.
915956
async fn complete_action(
916957
&self,
917958
job_id: ResolvedDocumentId,
918-
expected_state: &ScheduledJob,
959+
expected_state: &ScheduledJobMetadata,
919960
usage_tracking: FunctionUsageTracker,
920961
job_state: ScheduledJobState,
921962
) -> anyhow::Result<()> {
922-
let (success, mut tx) = self
923-
.new_transaction_for_job_state(job_id, expected_state, usage_tracking)
963+
let (success, mut tx, _metadata) = self
964+
.new_transaction_for_job_metadata(job_id, expected_state, usage_tracking)
924965
.await?;
925966
if !success {
926967
// Continue without updating since the job state has changed
@@ -987,7 +1028,7 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
9871028

9881029
let mut jobs_to_delete = vec![];
9891030
while let Some(doc) = query_stream.next(&mut tx, None).await? {
990-
let job: ParsedDocument<ScheduledJob> = doc.parse()?;
1031+
let job: ParsedDocument<ScheduledJobMetadata> = doc.parse()?;
9911032
match job.state {
9921033
ScheduledJobState::Success => (),
9931034
ScheduledJobState::Failed(_) => (),

crates/application/src/test_helpers.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use common::{
2424
},
2525
components::ComponentId,
2626
db_schema,
27+
document::ParsedDocument,
2728
http::fetch::StaticFetchClient,
2829
knobs::{
2930
ACTION_USER_TIMEOUT,
@@ -89,7 +90,10 @@ use model::{
8990
ExportsModel,
9091
},
9192
initialize_application_system_tables,
92-
scheduled_jobs::types::ScheduledJob,
93+
scheduled_jobs::{
94+
types::ScheduledJobMetadata,
95+
SchedulerModel,
96+
},
9397
udf_config::types::UdfConfig,
9498
virtual_system_mapping,
9599
};
@@ -154,8 +158,7 @@ pub trait ApplicationTestExt<RT: Runtime> {
154158
) -> anyhow::Result<Application<RT>>;
155159
async fn test_one_off_scheduled_job_executor_run(
156160
&self,
157-
job: ScheduledJob,
158-
job_id: ResolvedDocumentId,
161+
job: ParsedDocument<ScheduledJobMetadata>,
159162
) -> anyhow::Result<()>;
160163
/// Load the modules from npm-packages/udf-tests
161164
async fn load_udf_tests_modules(&self) -> anyhow::Result<()>;
@@ -300,16 +303,22 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
300303

301304
async fn test_one_off_scheduled_job_executor_run(
302305
&self,
303-
job: ScheduledJob,
304-
job_id: ResolvedDocumentId,
306+
metadata: ParsedDocument<ScheduledJobMetadata>,
305307
) -> anyhow::Result<()> {
306308
let test_executor = ScheduledJobContext::new(
307309
self.runtime.clone(),
308310
self.database.clone(),
309311
self.runner.clone(),
310312
self.function_log.clone(),
311313
);
312-
test_executor.execute_job(job, job_id).await;
314+
let mut tx = self.begin(Identity::system()).await?;
315+
let namespace = tx
316+
.table_mapping()
317+
.tablet_namespace(metadata.id().tablet_id)?;
318+
let metadata_id = metadata.id();
319+
let mut model = SchedulerModel::new(&mut tx, namespace);
320+
let job = model.scheduled_job_from_metadata(metadata).await?;
321+
test_executor.execute_job(job, metadata_id).await;
313322
Ok(())
314323
}
315324

crates/application/src/tests/scheduled_jobs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ async fn test_scheduled_jobs_race_condition(rt: TestRuntime) -> anyhow::Result<(
169169
let (_job_id, mut model) = create_scheduled_job(&rt, &mut tx, path.clone()).await?;
170170
let jobs = model.list().await?;
171171
assert_eq!(jobs.len(), 1);
172-
let (job_id, job) = jobs[0].clone().into_id_and_value();
172+
let job = jobs[0].clone();
173173

174174
// Cancel the scheduled job
175175
model.cancel_all(Some(path), 1, None, None).await?;
@@ -180,7 +180,7 @@ async fn test_scheduled_jobs_race_condition(rt: TestRuntime) -> anyhow::Result<(
180180
// execute after the job was created but before it was canceled. We should
181181
// handle the race condition gracefully.
182182
application
183-
.test_one_off_scheduled_job_executor_run(job, job_id)
183+
.test_one_off_scheduled_job_executor_run(job)
184184
.await?;
185185
Ok(())
186186
}

crates/common/src/virtual_system_mapping.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub trait VirtualSystemDocMapper: Send + Sync {
4141
/// (implemented by `Transaction`) to convert system documents joined across
4242
/// multiple system tables to virtual documents in `VirtualSystemDocMapper`.
4343
#[async_trait]
44-
pub trait GetDocument {
44+
pub trait GetDocument: Send + Sync {
4545
async fn get_document(
4646
&mut self,
4747
id: ResolvedDocumentId,

0 commit comments

Comments
 (0)