Skip to content

Commit a42c231

Browse files
committed
Use fine-grained lock for JDBC DAO implementations
Signed-off-by: Yanming Zhou <[email protected]>
1 parent a979784 commit a42c231

File tree

4 files changed

+80
-18
lines changed

4 files changed

+80
-18
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2006-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.core.repository.dao.jdbc;
18+
19+
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.concurrent.locks.Lock;
22+
import java.util.concurrent.locks.ReentrantLock;
23+
24+
/**
25+
* Fine-grained lock implementation.
26+
*
27+
* @author Yanming Zhou
28+
*/
29+
class FineGrainedLock<T> {
30+
31+
record Wrapper(Lock lock, AtomicInteger numberOfThreadsInQueue) {
32+
33+
private Wrapper addThreadInQueue() {
34+
numberOfThreadsInQueue.incrementAndGet();
35+
return this;
36+
}
37+
38+
private int removeThreadFromQueue() {
39+
return numberOfThreadsInQueue.decrementAndGet();
40+
}
41+
}
42+
43+
private final ConcurrentHashMap<T, Wrapper> locks = new ConcurrentHashMap<>();
44+
45+
public void lock(T key) {
46+
Wrapper wrapper = locks.compute(key,
47+
(k, v) -> v == null ? new Wrapper(new ReentrantLock(), new AtomicInteger(1)) : v.addThreadInQueue());
48+
wrapper.lock.lock();
49+
}
50+
51+
public void unlock(T key) {
52+
Wrapper wrapper = locks.get(key);
53+
if (wrapper == null) {
54+
throw new IllegalStateException("Lock on '" + key + "' doesn't exist, please lock it first");
55+
}
56+
wrapper.lock.unlock();
57+
if (wrapper.removeThreadFromQueue() == 0) {
58+
locks.remove(key, wrapper);
59+
}
60+
}
61+
62+
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Map.Entry;
33-
import java.util.concurrent.locks.Lock;
34-
import java.util.concurrent.locks.ReentrantLock;
3533

3634
import org.springframework.batch.core.job.JobExecution;
3735

@@ -58,6 +56,7 @@
5856
* @author Michael Minella
5957
* @author David Turanski
6058
* @author Mahmoud Ben Hassine
59+
* @author Yanming Zhou
6160
*/
6261
public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implements ExecutionContextDao {
6362

@@ -113,7 +112,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem
113112

114113
private ExecutionContextSerializer serializer = new DefaultExecutionContextSerializer();
115114

116-
private final Lock lock = new ReentrantLock();
115+
private final FineGrainedLock<Long> lock = new FineGrainedLock<>();
117116

118117
/**
119118
* Setter for {@link Serializer} implementation
@@ -194,9 +193,9 @@ public void updateExecutionContext(JobExecution jobExecution) {
194193
public void updateExecutionContext(StepExecution stepExecution) {
195194
// Attempt to prevent concurrent modification errors by blocking here if
196195
// someone is already trying to do it.
197-
this.lock.lock();
196+
Long executionId = stepExecution.getId();
197+
this.lock.lock(executionId);
198198
try {
199-
Long executionId = stepExecution.getId();
200199
ExecutionContext executionContext = stepExecution.getExecutionContext();
201200
Assert.notNull(executionId, "ExecutionId must not be null.");
202201
Assert.notNull(executionContext, "The ExecutionContext must not be null.");
@@ -206,7 +205,7 @@ public void updateExecutionContext(StepExecution stepExecution) {
206205
persistSerializedContext(executionId, serializedContext, UPDATE_STEP_EXECUTION_CONTEXT);
207206
}
208207
finally {
209-
this.lock.unlock();
208+
this.lock.unlock(executionId);
210209
}
211210
}
212211

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import java.util.HashSet;
2525
import java.util.List;
2626
import java.util.Set;
27-
import java.util.concurrent.locks.Lock;
28-
import java.util.concurrent.locks.ReentrantLock;
2927

3028
import org.apache.commons.logging.Log;
3129
import org.apache.commons.logging.LogFactory;
@@ -153,7 +151,7 @@ SELECT COUNT(*)
153151

154152
private DataFieldMaxValueIncrementer jobExecutionIncrementer;
155153

156-
private final Lock lock = new ReentrantLock();
154+
private final FineGrainedLock<Long> lock = new FineGrainedLock<>();
157155

158156
/**
159157
* Public setter for the exit message length in database. Do not set this if you
@@ -250,13 +248,15 @@ public void updateJobExecution(JobExecution jobExecution) {
250248

251249
validateJobExecution(jobExecution);
252250

253-
Assert.notNull(jobExecution.getId(),
251+
Long executionId = jobExecution.getId();
252+
253+
Assert.notNull(executionId,
254254
"JobExecution ID cannot be null. JobExecution must be saved before it can be updated");
255255

256256
Assert.notNull(jobExecution.getVersion(),
257257
"JobExecution version cannot be null. JobExecution must be saved before it can be updated");
258258

259-
this.lock.lock();
259+
this.lock.lock(executionId);
260260
try {
261261

262262
String exitDescription = jobExecution.getExitStatus().getExitDescription();
@@ -304,7 +304,7 @@ public void updateJobExecution(JobExecution jobExecution) {
304304
jobExecution.incrementVersion();
305305
}
306306
finally {
307-
this.lock.unlock();
307+
this.lock.unlock(executionId);
308308
}
309309
}
310310

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
2424
import java.util.List;
25-
import java.util.concurrent.locks.Lock;
26-
import java.util.concurrent.locks.ReentrantLock;
2725

2826
import org.apache.commons.logging.Log;
2927
import org.apache.commons.logging.LogFactory;
@@ -127,7 +125,7 @@ SELECT COUNT(*)
127125

128126
private JdbcJobExecutionDao jobExecutionDao;
129127

130-
private final Lock lock = new ReentrantLock();
128+
private final FineGrainedLock<Long> lock = new FineGrainedLock<>();
131129

132130
/**
133131
* Public setter for the exit message length in database. Do not set this if you
@@ -215,7 +213,10 @@ private void validateStepExecution(StepExecution stepExecution) {
215213
public void updateStepExecution(StepExecution stepExecution) {
216214

217215
validateStepExecution(stepExecution);
218-
Assert.notNull(stepExecution.getId(),
216+
217+
Long executionId = stepExecution.getId();
218+
219+
Assert.notNull(executionId,
219220
"StepExecution Id cannot be null. StepExecution must saved" + " before it can be updated.");
220221

221222
// Do not check for existence of step execution considering
@@ -225,7 +226,7 @@ public void updateStepExecution(StepExecution stepExecution) {
225226

226227
// Attempt to prevent concurrent modification errors by blocking here if
227228
// someone is already trying to do it.
228-
this.lock.lock();
229+
this.lock.lock(executionId);
229230
try {
230231

231232
Timestamp startTime = stepExecution.getStartTime() == null ? null
@@ -258,7 +259,7 @@ public void updateStepExecution(StepExecution stepExecution) {
258259

259260
}
260261
finally {
261-
this.lock.unlock();
262+
this.lock.unlock(executionId);
262263
}
263264
}
264265

0 commit comments

Comments
 (0)