Skip to content

Commit 7af8f87

Browse files
committed
feat: support pattern subscription non persistent topic.
1 parent 4c28ed5 commit 7af8f87

File tree

4 files changed

+45
-2
lines changed

4 files changed

+45
-2
lines changed

pulsar/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import _pulsar
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
51-
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode # noqa: F401
51+
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode # noqa: F401
5252

5353
from pulsar.__about__ import __version__
5454

@@ -708,6 +708,7 @@ def subscribe(self, topic, subscription_name,
708708
batch_receive_policy=None,
709709
key_shared_policy=None,
710710
batch_index_ack_enabled=False,
711+
regex_subscription_mode=RegexSubscriptionMode.PersistentOnly,
711712
):
712713
"""
713714
Subscribe to the given topic and subscription combination.
@@ -819,9 +820,11 @@ def my_listener(consumer, message):
819820
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
820821
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
821822
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
823+
_check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode')
822824

823825
conf = _pulsar.ConsumerConfiguration()
824826
conf.consumer_type(consumer_type)
827+
conf.regex_subscription_mode(regex_subscription_mode)
825828
conf.read_compacted(is_read_compacted)
826829
if message_listener:
827830
conf.message_listener(_listener_wrapper(message_listener, schema))

src/config.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,8 @@ void export_config(py::module_& m) {
267267
.def("property", &ConsumerConfiguration::setProperty, return_value_policy::reference)
268268
.def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
269269
.def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
270+
.def("regex_subscription_mode", &ConsumerConfiguration::setRegexSubscriptionMode)
271+
.def("regex_subscription_mode", &ConsumerConfiguration::getRegexSubscriptionMode, return_value_policy::reference)
270272
.def("crypto_key_reader", &ConsumerConfiguration::setCryptoKeyReader, return_value_policy::reference)
271273
.def("replicate_subscription_state_enabled",
272274
&ConsumerConfiguration::setReplicateSubscriptionStateEnabled)

src/enums.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ void export_enums(py::module_& m) {
120120
.value("Latest", InitialPositionLatest)
121121
.value("Earliest", InitialPositionEarliest);
122122

123+
enum_<RegexSubscriptionMode>(m, "RegexSubscriptionMode", "Regex subscription mode")
124+
.value("PersistentOnly", PersistentOnly)
125+
.value("NonPersistentOnly", NonPersistentOnly)
126+
.value("AllTopics", AllTopics);
127+
123128
enum_<ProducerConfiguration::BatchingType>(m, "BatchingType", "Supported batching types")
124129
.value("Default", ProducerConfiguration::DefaultBatching)
125130
.value("KeyBased", ProducerConfiguration::KeyBasedBatching);

tests/pulsar_test.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
)
4747
from pulsar.schema import JsonSchema, Record, Integer
4848

49-
from _pulsar import ProducerConfiguration, ConsumerConfiguration
49+
from _pulsar import ProducerConfiguration, ConsumerConfiguration, RegexSubscriptionMode
5050

5151
from schema_test import *
5252

@@ -1718,6 +1718,39 @@ def test_batch_index_ack(self):
17181718

17191719
client.close()
17201720

1721+
def test_regex_subscription(self):
1722+
import re
1723+
1724+
client = Client(self.serviceUrl)
1725+
topic1 = "persistent://public/default/test-regex-sub-1"
1726+
topic2 = "persistent://public/default/test-regex-sub-2"
1727+
topic3 = "non-persistent://public/default/test-regex-sub-3"
1728+
topic4 = "persistent://public/default/no-match-test-regex-sub-3" # no match pattern rule topic.
1729+
1730+
producer1 = client.create_producer(topic1)
1731+
producer2 = client.create_producer(topic2)
1732+
producer3 = client.create_producer(topic3)
1733+
producer4 = client.create_producer(topic4)
1734+
1735+
consumer = client.subscribe(
1736+
re.compile('public/default/test-regex-sub-.*'), "regex-sub", consumer_type=ConsumerType.Shared,
1737+
regex_subscription_mode=RegexSubscriptionMode.AllTopics
1738+
)
1739+
1740+
num = 10
1741+
for i in range(num):
1742+
producer1.send(b"hello-1-%d" % i)
1743+
producer2.send(b"hello-2-%d" % i)
1744+
producer3.send(b"hello-3-%d" % i)
1745+
producer4.send(b"hello-4-%d" % i)
1746+
1747+
for i in range(3 * num):
1748+
msg = consumer.receive(TM)
1749+
consumer.acknowledge(msg)
1750+
1751+
with self.assertRaises(pulsar.Timeout):
1752+
consumer.receive(100)
1753+
client.close()
17211754

17221755
if __name__ == "__main__":
17231756
main()

0 commit comments

Comments
 (0)