Skip to content

Commit 8900a18

Browse files
authored
KAFKA-12999 Make RecordHeader reads thread-safe (KIP-1205) (#20751)
This patch implements [KIP-1205](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1205%3A+Improve+RecordHeader+to+be+Thread-Safe) to address concurrency issues (NPE) related to the lazy initialization of `RecordHeader` fields. The added test verified that concurrent access to `RecordHeader.key()` and `RecordHeader.value()` no longer throws `NullPointerException` Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 54cf2eb commit 8900a18

File tree

4 files changed

+68
-7
lines changed

4 files changed

+68
-7
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@
4545
* states. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized.
4646
*
4747
* <p>
48+
* However, each individual {@link org.apache.kafka.common.header.Header} instance
49+
* is <b>read thread-safe</b>; that is, it is safe for multiple threads to read the same header's key or value concurrently
50+
* as long as no thread modifies it.
51+
*
52+
* <p>
4853
* Refer to the {@link KafkaConsumer} documentation for more details on multi-threaded consumption and processing strategies.
4954
*/
5055
public class ConsumerRecord<K, V> {

clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525

2626
public class RecordHeader implements Header {
2727
private ByteBuffer keyBuffer;
28-
private String key;
29-
private ByteBuffer valueBuffer;
30-
private byte[] value;
28+
private volatile String key;
29+
private volatile ByteBuffer valueBuffer;
30+
private volatile byte[] value;
3131

3232
public RecordHeader(String key, byte[] value) {
3333
Objects.requireNonNull(key, "Null header keys are not permitted");
@@ -42,16 +42,24 @@ public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
4242

4343
public String key() {
4444
if (key == null) {
45-
key = Utils.utf8(keyBuffer, keyBuffer.remaining());
46-
keyBuffer = null;
45+
synchronized (this) {
46+
if (key == null) {
47+
key = Utils.utf8(keyBuffer, keyBuffer.remaining());
48+
keyBuffer = null;
49+
}
50+
}
4751
}
4852
return key;
4953
}
5054

5155
public byte[] value() {
5256
if (value == null && valueBuffer != null) {
53-
value = Utils.toArray(valueBuffer);
54-
valueBuffer = null;
57+
synchronized (this) {
58+
if (value == null && valueBuffer != null) {
59+
value = Utils.toArray(valueBuffer);
60+
valueBuffer = null;
61+
}
62+
}
5563
}
5664
return value;
5765
}

clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@
1919
import org.apache.kafka.common.header.Header;
2020
import org.apache.kafka.common.header.Headers;
2121

22+
import org.junit.jupiter.api.RepeatedTest;
2223
import org.junit.jupiter.api.Test;
2324

2425
import java.io.IOException;
26+
import java.nio.ByteBuffer;
27+
import java.nio.charset.StandardCharsets;
2528
import java.util.Iterator;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.stream.Collectors;
32+
import java.util.stream.IntStream;
2633

2734
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
2835
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -265,4 +272,41 @@ static void assertHeader(String key, String value, Header actual) {
265272
assertArrayEquals(value.getBytes(), actual.value());
266273
}
267274

275+
private void assertRecordHeaderReadThreadSafe(RecordHeader header) {
276+
int threadCount = 16;
277+
CountDownLatch startLatch = new CountDownLatch(1);
278+
279+
var futures = IntStream.range(0, threadCount)
280+
.mapToObj(i -> CompletableFuture.runAsync(() -> {
281+
try {
282+
startLatch.await();
283+
header.key();
284+
header.value();
285+
} catch (InterruptedException e) {
286+
Thread.currentThread().interrupt();
287+
throw new RuntimeException(e);
288+
}
289+
})).collect(Collectors.toUnmodifiableList());
290+
291+
startLatch.countDown();
292+
futures.forEach(CompletableFuture::join);
293+
}
294+
295+
@RepeatedTest(100)
296+
public void testRecordHeaderIsReadThreadSafe() throws Exception {
297+
RecordHeader header = new RecordHeader(
298+
ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)),
299+
ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8))
300+
);
301+
assertRecordHeaderReadThreadSafe(header);
302+
}
303+
304+
@RepeatedTest(100)
305+
public void testRecordHeaderWithNullValueIsReadThreadSafe() throws Exception {
306+
RecordHeader header = new RecordHeader(
307+
ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)),
308+
null
309+
);
310+
assertRecordHeaderReadThreadSafe(header);
311+
}
268312
}

docs/upgrade.html

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ <h5><a id="upgrade_4_2_0_from" href="#upgrade_4_2_0_from">Upgrading Servers to 4
2525

2626
<h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4.2.0</a></h5>
2727
<ul>
28+
<li>
29+
The <code>org.apache.kafka.common.header.internals.RecordHeader</code> class has been updated to be read thread-safe. See <a href="https://cwiki.apache.org/confluence/x/nYmhFg">KIP-1205</a> for details.
30+
In other words, each individual <code>Header</code> object within a <code>ConsumerRecord</code>'s <code>headers</code> can now be safely read from multiple threads concurrently.
31+
</li>
2832
<li>
2933
The <code>org.apache.kafka.disallowed.login.modules</code> config was deprecated. Please use the <code>org.apache.kafka.allowed.login.modules</code>
3034
instead.

0 commit comments

Comments
 (0)