Skip to content

Commit f4de248

Browse files
BewareMyPowerlhotari
authored andcommitted
[fix][client][branch-4.0] Partitioned topics are unexpectedly created by client after deletion (#24554) (#24571)
(cherry picked from commit 16271dc)
1 parent 6f91344 commit f4de248

File tree

4 files changed

+72
-10
lines changed

4 files changed

+72
-10
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3073,7 +3073,7 @@ public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
30733073

30743074
// Truncate to ensure the offloaded data is not orphaned.
30753075
// Also ensures the BK ledgers are deleted and not just scheduled for deletion
3076-
CompletableFuture<Void> truncateFuture = asyncTruncate();
3076+
CompletableFuture<Void> truncateFuture = asyncTruncate(true);
30773077
truncateFuture.whenComplete((ignore, exc) -> {
30783078
if (exc != null) {
30793079
log.error("[{}] Error truncating ledger for deletion", name, exc);
@@ -4480,6 +4480,12 @@ public void setEntriesAddedCounter(long count) {
44804480

44814481
@Override
44824482
public CompletableFuture<Void> asyncTruncate() {
4483+
return asyncTruncate(false);
4484+
}
4485+
4486+
// When asyncTruncate is called by asyncDelete, the argument should be true because cursors will not be accessed
4487+
// after the managed ledger is deleted.
4488+
private CompletableFuture<Void> asyncTruncate(boolean ignoreCursorFailure) {
44834489

44844490
final List<CompletableFuture<Void>> futures = new ArrayList();
44854491
for (ManagedCursor cursor : cursors) {
@@ -4492,7 +4498,12 @@ public void clearBacklogComplete(Object ctx) {
44924498

44934499
@Override
44944500
public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
4495-
future.completeExceptionally(exception);
4501+
if (ignoreCursorFailure) {
4502+
log.warn("Failed to clear backlog for cursor {}", cursor.getName(), exception);
4503+
future.complete(null);
4504+
} else {
4505+
future.completeExceptionally(exception);
4506+
}
44964507
}
44974508
}, null);
44984509
futures.add(future);

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import static org.mockito.Mockito.when;
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertTrue;
27+
import java.io.Closeable;
2728
import java.net.InetSocketAddress;
29+
import java.time.Duration;
2830
import java.util.List;
2931
import java.util.UUID;
3032
import java.util.concurrent.CompletableFuture;
@@ -36,26 +38,29 @@
3638
import org.apache.pulsar.client.admin.PulsarAdminException;
3739
import org.apache.pulsar.client.api.ClientBuilder;
3840
import org.apache.pulsar.client.api.Consumer;
41+
import org.apache.pulsar.client.api.MessageId;
3942
import org.apache.pulsar.client.api.Producer;
4043
import org.apache.pulsar.client.api.ProducerConsumerBase;
44+
import org.apache.pulsar.client.api.PulsarClient;
4145
import org.apache.pulsar.client.api.PulsarClientException;
4246
import org.apache.pulsar.client.impl.LookupService;
4347
import org.apache.pulsar.client.impl.LookupTopicResult;
4448
import org.apache.pulsar.client.impl.PulsarClientImpl;
4549
import org.apache.pulsar.common.naming.NamespaceName;
50+
import org.apache.pulsar.common.naming.TopicName;
4651
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
4752
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
4853
import org.apache.pulsar.common.policies.data.TopicType;
49-
import org.testng.annotations.AfterMethod;
50-
import org.testng.annotations.BeforeMethod;
54+
import org.testng.annotations.AfterClass;
55+
import org.testng.annotations.BeforeClass;
5156
import org.testng.annotations.Test;
5257

5358
@Test(groups = "broker-admin")
5459
@Slf4j
5560
public class TopicAutoCreationTest extends ProducerConsumerBase {
5661

5762
@Override
58-
@BeforeMethod
63+
@BeforeClass
5964
protected void setup() throws Exception {
6065
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
6166
conf.setAllowAutoTopicCreation(true);
@@ -71,7 +76,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
7176
}
7277

7378
@Override
74-
@AfterMethod(alwaysRun = true)
79+
@AfterClass(alwaysRun = true)
7580
protected void cleanup() throws Exception {
7681
super.internalCleanup();
7782
}
@@ -87,9 +92,11 @@ public void testPartitionedTopicAutoCreation() throws PulsarAdminException, Puls
8792
.create();
8893

8994
List<String> partitionedTopics = admin.topics().getPartitionedTopicList(namespaceName);
95+
assertTrue(partitionedTopics.contains(topic));
9096
List<String> topics = admin.topics().getList(namespaceName);
91-
assertEquals(partitionedTopics.size(), 1);
92-
assertEquals(topics.size(), 3);
97+
for (int i = 0; i < conf.getDefaultNumPartitions(); i++) {
98+
assertTrue(topics.contains(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i));
99+
}
93100

94101
producer.close();
95102
for (String t : topics) {
@@ -248,4 +255,48 @@ public void testClientWithAutoCreationGotNotFoundException() throws PulsarAdminE
248255
admin.namespaces().deleteNamespace(namespace, true);
249256
}
250257

258+
@Test
259+
public void testPartitionsNotCreatedAfterDeletion() throws Exception {
260+
@Cleanup final var client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
261+
final var topicName = TopicName.get("my-property/my-ns/testPartitionsNotCreatedAfterDeletion");
262+
final var topic = topicName.toString();
263+
final var interval = Duration.ofSeconds(1);
264+
final ThrowableConsumer<ThrowableSupplier<Closeable>> verifier = creator -> {
265+
admin.topics().createPartitionedTopic(topic, 1);
266+
boolean needCleanup = false;
267+
try (final var ignored = creator.get()) {
268+
admin.topics().terminatePartitionedTopic(topic);
269+
admin.topics().deletePartitionedTopic(topic, true);
270+
Thread.sleep(interval.toMillis() + 500); // wait until the auto update partitions task has run
271+
272+
final var topics = admin.topics().getList(topicName.getNamespace()).stream()
273+
.filter(__ -> __.contains(topicName.getLocalName())).toList();
274+
// Without https://github.com/apache/pulsar/pull/24118, the producer or consumer on partition 0 could be
275+
// automatically created.
276+
if (!topics.isEmpty()) {
277+
assertEquals(topics, List.of(topicName.getPartition(0).toString()));
278+
needCleanup = true;
279+
}
280+
}
281+
if (needCleanup) {
282+
admin.topics().delete(topicName.getPartition(0).toString());
283+
}
284+
};
285+
verifier.accept(() -> client.newProducer().topic(topic)
286+
.autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create());
287+
verifier.accept(() -> client.newConsumer().topic(topic).subscriptionName("sub")
288+
.autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).subscribe());
289+
verifier.accept(() -> client.newReader().topic(topic).startMessageId(MessageId.earliest)
290+
.autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create());
291+
}
292+
293+
private interface ThrowableConsumer<T> {
294+
295+
void accept(T value) throws Exception;
296+
}
297+
298+
public interface ThrowableSupplier<T> {
299+
300+
T get() throws Exception;
301+
}
251302
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1396,7 +1396,7 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende
13961396
private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicName) {
13971397
int oldPartitionNumber = partitionedTopics.get(topicName);
13981398

1399-
return client.getPartitionsForTopic(topicName).thenCompose(list -> {
1399+
return client.getPartitionsForTopic(topicName, false).thenCompose(list -> {
14001400
int currentPartitionNumber = Long.valueOf(list.stream()
14011401
.filter(t -> TopicName.get(t).isPartitioned()).count()).intValue();
14021402

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende
391391
return future;
392392
}
393393

394-
client.getPartitionsForTopic(topic).thenCompose(list -> {
394+
client.getPartitionsForTopic(topic, false).thenCompose(list -> {
395395
int oldPartitionNumber = topicMetadata.numPartitions();
396396
int currentPartitionNumber = list.size();
397397

0 commit comments

Comments
 (0)