Skip to content

Commit 8626ae4

Browse files
nikolajlauridsenCopilotZeegaan
authored
Distributed Background Jobs: Improve distributed background job locking behavior and performance (#21100)
* Don't take jobs that's already running and handle stale jobs * Add tests * Add bulk insert and delete methods to repository * Optimize EnsureJobsAsync to use batch inserts * Add EnsureJobs tests * Remember to keep old constructor * Minor cleanup * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> * Apply suggestions from code review Co-authored-by: Nikolaj Geisle <[email protected]> --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: Nikolaj Geisle <[email protected]>
1 parent f8d6a97 commit 8626ae4

File tree

6 files changed

+467
-30
lines changed

6 files changed

+467
-30
lines changed

src/Umbraco.Core/Configuration/Models/DistributedJobSettings.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class DistributedJobSettings
1010
{
1111
internal const string StaticPeriod = "00:00:05";
1212
internal const string StaticDelay = "00:01:00";
13+
internal const string StaticMaxExecutionTime = "00:05:00";
1314

1415
/// <summary>
1516
/// Gets or sets a value for the period of checking if there are any runnable distributed jobs.
@@ -22,4 +23,11 @@ public class DistributedJobSettings
2223
/// </summary>
2324
[DefaultValue(StaticDelay)]
2425
public TimeSpan Delay { get; set; } = TimeSpan.Parse(StaticDelay);
26+
27+
/// <summary>
28+
/// Gets or sets the maximum execution time for a distributed job before it is considered timed out.
29+
/// When a job exceeds this time, it is considered stale and can be picked up by another server for recovery and restarted.
30+
/// </summary>
31+
[DefaultValue(StaticMaxExecutionTime)]
32+
public TimeSpan MaximumExecutionTime { get; set; } = TimeSpan.Parse(StaticMaxExecutionTime);
2533
}

src/Umbraco.Infrastructure/BackgroundJobs/DistributedBackgroundJobHostedService.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Diagnostics;
2-
using Microsoft.Extensions.Hosting;
1+
using Microsoft.Extensions.Hosting;
32
using Microsoft.Extensions.Logging;
43
using Microsoft.Extensions.Options;
54
using Umbraco.Cms.Core;
@@ -22,10 +21,6 @@ public class DistributedBackgroundJobHostedService : BackgroundService
2221
/// <summary>
2322
/// Initializes a new instance of the <see cref="DistributedBackgroundJobHostedService"/> class.
2423
/// </summary>
25-
/// <param name="logger"></param>
26-
/// <param name="runtimeState"></param>
27-
/// <param name="distributedJobService"></param>
28-
/// <param name="distributedJobSettings"></param>
2924
public DistributedBackgroundJobHostedService(
3025
ILogger<DistributedBackgroundJobHostedService> logger,
3126
IRuntimeState runtimeState,

src/Umbraco.Infrastructure/Persistence/Repositories/IDistributedJobRepository.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,30 @@ public interface IDistributedJobRepository
3333
/// Deletes a job.
3434
/// </summary>
3535
void Delete(DistributedBackgroundJobModel distributedBackgroundJob);
36+
37+
/// <summary>
38+
/// Adds multiple jobs in a single batch operation.
39+
/// </summary>
40+
/// <param name="jobs">The jobs to add.</param>
41+
void Add(IEnumerable<DistributedBackgroundJobModel> jobs)
42+
{
43+
// TODO: Delete default implementation in V18
44+
foreach (DistributedBackgroundJobModel job in jobs)
45+
{
46+
Add(job);
47+
}
48+
}
49+
50+
/// <summary>
51+
/// Deletes multiple jobs in a single batch operation.
52+
/// </summary>
53+
/// <param name="jobs">The jobs to delete.</param>
54+
void Delete(IEnumerable<DistributedBackgroundJobModel> jobs)
55+
{
56+
// TODO: Delete default implementation in V18
57+
foreach (DistributedBackgroundJobModel job in jobs)
58+
{
59+
Delete(job);
60+
}
61+
}
3662
}

src/Umbraco.Infrastructure/Persistence/Repositories/Implement/DistributedJobRepository.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,40 @@ public void Delete(DistributedBackgroundJobModel distributedBackgroundJob)
8787
}
8888
}
8989

90+
/// <inheritdoc/>
91+
public void Add(IEnumerable<DistributedBackgroundJobModel> jobs)
92+
{
93+
if (scopeAccessor.AmbientScope is null)
94+
{
95+
throw new InvalidOperationException("No scope, could not add distributed jobs");
96+
}
97+
98+
IEnumerable<DistributedJobDto> dtos = jobs.Select(MapToDto);
99+
scopeAccessor.AmbientScope.Database.InsertBulk(dtos);
100+
}
101+
102+
/// <inheritdoc/>
103+
public void Delete(IEnumerable<DistributedBackgroundJobModel> jobs)
104+
{
105+
if (scopeAccessor.AmbientScope is null)
106+
{
107+
throw new InvalidOperationException("No scope, could not delete distributed jobs");
108+
}
109+
110+
var jobIds = jobs.Select(x => x.Id).ToArray();
111+
if (jobIds.Length is 0)
112+
{
113+
return;
114+
}
115+
116+
Sql<ISqlContext> sql = scopeAccessor.AmbientScope.SqlContext.Sql()
117+
.Delete()
118+
.From<DistributedJobDto>()
119+
.WhereIn<DistributedJobDto>(x => x.Id, jobIds);
120+
121+
scopeAccessor.AmbientScope.Database.Execute(sql);
122+
}
123+
90124
private DistributedJobDto MapToDto(DistributedBackgroundJobModel model) =>
91125
new()
92126
{

src/Umbraco.Infrastructure/Services/Implement/DistributedJobService.cs

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
using Microsoft.Extensions.Logging;
1+
using Microsoft.Extensions.DependencyInjection;
2+
using Microsoft.Extensions.Logging;
3+
using Microsoft.Extensions.Options;
24
using Umbraco.Cms.Core;
5+
using Umbraco.Cms.Core.Configuration.Models;
6+
using Umbraco.Cms.Core.DependencyInjection;
37
using Umbraco.Cms.Core.Scoping;
48
using Umbraco.Cms.Infrastructure.BackgroundJobs;
59
using Umbraco.Cms.Infrastructure.Models;
@@ -14,24 +18,41 @@ public class DistributedJobService : IDistributedJobService
1418
private readonly IDistributedJobRepository _distributedJobRepository;
1519
private readonly IEnumerable<IDistributedBackgroundJob> _distributedBackgroundJobs;
1620
private readonly ILogger<DistributedJobService> _logger;
21+
private readonly DistributedJobSettings _settings;
1722

1823
/// <summary>
1924
/// Initializes a new instance of the <see cref="DistributedJobService"/> class.
2025
/// </summary>
21-
/// <param name="coreScopeProvider"></param>
22-
/// <param name="distributedJobRepository"></param>
23-
/// <param name="distributedBackgroundJobs"></param>
24-
/// <param name="logger"></param>
26+
[Obsolete("Use the constructor that accepts IOptions<DistributedJobSettings>. Scheduled for removal in V18.")]
2527
public DistributedJobService(
2628
ICoreScopeProvider coreScopeProvider,
2729
IDistributedJobRepository distributedJobRepository,
2830
IEnumerable<IDistributedBackgroundJob> distributedBackgroundJobs,
2931
ILogger<DistributedJobService> logger)
32+
: this(
33+
coreScopeProvider,
34+
distributedJobRepository,
35+
distributedBackgroundJobs,
36+
logger,
37+
StaticServiceProvider.Instance.GetRequiredService<IOptions<DistributedJobSettings>>())
38+
{
39+
}
40+
41+
/// <summary>
42+
/// Initializes a new instance of the <see cref="DistributedJobService"/> class.
43+
/// </summary>
44+
public DistributedJobService(
45+
ICoreScopeProvider coreScopeProvider,
46+
IDistributedJobRepository distributedJobRepository,
47+
IEnumerable<IDistributedBackgroundJob> distributedBackgroundJobs,
48+
ILogger<DistributedJobService> logger,
49+
IOptions<DistributedJobSettings> settings)
3050
{
3151
_coreScopeProvider = coreScopeProvider;
3252
_distributedJobRepository = distributedJobRepository;
3353
_distributedBackgroundJobs = distributedBackgroundJobs;
3454
_logger = logger;
55+
_settings = settings.Value;
3556
}
3657

3758
/// <inheritdoc />
@@ -42,7 +63,8 @@ public DistributedJobService(
4263
scope.EagerWriteLock(Constants.Locks.DistributedJobs);
4364

4465
IEnumerable<DistributedBackgroundJobModel> jobs = _distributedJobRepository.GetAll();
45-
DistributedBackgroundJobModel? job = jobs.FirstOrDefault(x => x.LastRun < DateTime.UtcNow - x.Period);
66+
DistributedBackgroundJobModel? job = jobs.FirstOrDefault(x => x.LastRun < DateTime.UtcNow - x.Period
67+
&& (x.IsRunning is false || x.LastAttemptedRun < DateTime.UtcNow - x.Period - _settings.MaximumExecutionTime));
4668

4769
if (job is null)
4870
{
@@ -93,45 +115,64 @@ public async Task FinishAsync(string jobName)
93115
/// <inheritdoc />
94116
public async Task EnsureJobsAsync()
95117
{
118+
// Pre-compute registered job data outside the lock to minimize lock hold time
119+
var registeredJobsByName = _distributedBackgroundJobs.ToDictionary(x => x.Name, x => x.Period);
120+
121+
// Early exit if no registered jobs
122+
if (registeredJobsByName.Count is 0)
123+
{
124+
return;
125+
}
126+
96127
using ICoreScope scope = _coreScopeProvider.CreateCoreScope();
97128
scope.WriteLock(Constants.Locks.DistributedJobs);
98129

99130
DistributedBackgroundJobModel[] existingJobs = _distributedJobRepository.GetAll().ToArray();
100131
var existingJobsByName = existingJobs.ToDictionary(x => x.Name);
101132

102-
foreach (IDistributedBackgroundJob registeredJob in _distributedBackgroundJobs)
133+
// Collect all changes first, then execute - minimizes time spent in the critical section
134+
var jobsToAdd = new List<DistributedBackgroundJobModel>();
135+
DateTime utcNow = DateTime.UtcNow;
136+
137+
foreach (KeyValuePair<string, TimeSpan> registeredJob in registeredJobsByName)
103138
{
104-
if (existingJobsByName.TryGetValue(registeredJob.Name, out DistributedBackgroundJobModel? existingJob))
139+
if (existingJobsByName.TryGetValue(registeredJob.Key, out DistributedBackgroundJobModel? existingJob))
105140
{
106-
// Update if period has changed
107-
if (existingJob.Period != registeredJob.Period)
141+
// Update only if period has actually changed
142+
if (existingJob.Period != registeredJob.Value)
108143
{
109-
existingJob.Period = registeredJob.Period;
144+
existingJob.Period = registeredJob.Value;
110145
_distributedJobRepository.Update(existingJob);
111146
}
112147
}
113148
else
114149
{
115-
// Add new job (fresh install or newly registered job)
116-
var newJob = new DistributedBackgroundJobModel
150+
// Collect new jobs for batch insert
151+
jobsToAdd.Add(new DistributedBackgroundJobModel
117152
{
118-
Name = registeredJob.Name,
119-
Period = registeredJob.Period,
120-
LastRun = DateTime.UtcNow,
153+
Name = registeredJob.Key,
154+
Period = registeredJob.Value,
155+
LastRun = utcNow,
121156
IsRunning = false,
122-
LastAttemptedRun = DateTime.UtcNow,
123-
};
124-
_distributedJobRepository.Add(newJob);
157+
LastAttemptedRun = utcNow,
158+
});
125159
}
126160
}
127161

128-
// Remove jobs that are no longer registered in code
129-
var registeredJobNames = _distributedBackgroundJobs.Select(x => x.Name).ToHashSet();
130-
IEnumerable<DistributedBackgroundJobModel> jobsToRemove = existingJobs.Where(x => registeredJobNames.Contains(x.Name) is false);
162+
// Batch insert new jobs
163+
if (jobsToAdd.Count > 0)
164+
{
165+
_distributedJobRepository.Add(jobsToAdd);
166+
}
167+
168+
// Batch delete jobs that are no longer registered
169+
var jobsToRemove = existingJobs
170+
.Where(x => registeredJobsByName.ContainsKey(x.Name) is false)
171+
.ToList();
131172

132-
foreach (DistributedBackgroundJobModel jobToRemove in jobsToRemove)
173+
if (jobsToRemove.Count > 0)
133174
{
134-
_distributedJobRepository.Delete(jobToRemove);
175+
_distributedJobRepository.Delete(jobsToRemove);
135176
}
136177

137178
scope.Complete();

0 commit comments

Comments
 (0)