Skip to content

Commit 64fd422

Browse files
jordanhunt22Convex, Inc.
authored andcommitted
[Scheduled Jobs] Add timer + minor optimization (#34367)
Adds a timer for the `scheduled_job` loop + adds a minor optimization to drain many scheduled jobs at once. GitOrigin-RevId: 091e1c45416501dad3f23edfbc2726bb2fd745c6
1 parent 0f63542 commit 64fd422

File tree

2 files changed

+22
-27
lines changed

2 files changed

+22
-27
lines changed

crates/application/src/scheduled_jobs/metrics.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ use metrics::{
55
log_counter_with_labels,
66
log_distribution_with_labels,
77
log_gauge,
8+
prometheus::VMHistogram,
89
register_convex_counter,
910
register_convex_gauge,
1011
register_convex_histogram,
1112
StaticMetricLabel,
13+
Timer,
1214
STATUS_LABEL,
1315
};
1416

@@ -63,3 +65,11 @@ register_convex_gauge!(
6365
pub fn log_num_running_jobs(num_running: usize) {
6466
log_gauge(&SCHEDULED_JOB_NUM_RUNNING_TOTAL, num_running as f64);
6567
}
68+
69+
register_convex_histogram!(
70+
RUN_SCHEDULED_JOBS_LOOP_SECONDS,
71+
"Time to run a single loop of the scheduled job executor",
72+
);
73+
pub fn run_scheduled_jobs_loop() -> Timer<VMHistogram> {
74+
Timer::new(&RUN_SCHEDULED_JOBS_LOOP_SECONDS)
75+
}

crates/application/src/scheduled_jobs/mod.rs

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use common::{
4747
Runtime,
4848
SpawnHandle,
4949
},
50-
tokio::task::yield_now,
5150
types::{
5251
FunctionCaller,
5352
UdfType,
@@ -165,12 +164,6 @@ pub struct ScheduledJobContext<RT: Runtime> {
165164
function_log: FunctionExecutionLog<RT>,
166165
}
167166

168-
/// This roughly matches tokio's permits that it uses as part of cooperative
169-
/// scheduling. We shouldn't use this for anything sophisticated, it's just a
170-
/// simple way for us to yield occasionally for scheduled jobs but not yield too
171-
/// often. We really don't need anything fancy here.
172-
const CHECKS_BETWEEN_YIELDS: usize = 128;
173-
174167
impl<RT: Runtime> ScheduledJobExecutor<RT> {
175168
pub fn start(
176169
rt: RT,
@@ -219,21 +212,6 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
219212
}
220213
}
221214

222-
async fn drain_finished_jobs(
223-
running_job_ids: &mut HashSet<ResolvedDocumentId>,
224-
rx: &mut mpsc::Receiver<ResolvedDocumentId>,
225-
) {
226-
let mut total_drained = 0;
227-
while let Ok(job_id) = rx.try_recv() {
228-
total_drained += 1;
229-
running_job_ids.remove(&job_id);
230-
if total_drained % CHECKS_BETWEEN_YIELDS == 0 {
231-
yield_now().await;
232-
}
233-
}
234-
tracing::debug!("Drained {total_drained} finished scheduled jobs from the channel");
235-
}
236-
237215
async fn run(&mut self, backoff: &mut Backoff) -> anyhow::Result<()> {
238216
tracing::info!("Starting scheduled job executor");
239217
let pause_client = self.context.rt.pause_client();
@@ -243,7 +221,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
243221
// Some if there's at least one pending job. May be in the past!
244222
let mut next_job_ready_time = None;
245223
loop {
246-
Self::drain_finished_jobs(&mut running_job_ids, &mut job_finished_rx).await;
224+
let _timer = metrics::run_scheduled_jobs_loop();
247225

248226
let mut tx = self.database.begin(Identity::Unknown).await?;
249227
let backend_state = BackendStateModel::new(&mut tx).get_backend_state().await?;
@@ -282,11 +260,18 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
282260
let token = tx.into_token()?;
283261
let subscription = self.database.subscribe(token).await?;
284262

263+
let mut job_ids: Vec<_> = Vec::new();
285264
select_biased! {
286-
job_id = job_finished_rx.recv().fuse() => {
287-
if let Some(job_id) = job_id {
288-
pause_client.wait(SCHEDULED_JOB_EXECUTED).await;
289-
running_job_ids.remove(&job_id);
265+
num_jobs = job_finished_rx
266+
.recv_many(&mut job_ids, *SCHEDULED_JOB_EXECUTION_PARALLELISM)
267+
.fuse() => {
268+
// `recv_many()` returns the number of jobs received. If this number is 0,
269+
// then the channel has been closed.
270+
if num_jobs > 0 {
271+
for job_id in job_ids {
272+
pause_client.wait(SCHEDULED_JOB_EXECUTED).await;
273+
running_job_ids.remove(&job_id);
274+
}
290275
} else {
291276
anyhow::bail!("Job results channel closed, this is unexpected!");
292277
}

0 commit comments

Comments
 (0)