Skip to content

Commit cf99e90

Browse files
authored
Merge branch 'main' into fix-proxy-hostname-resolution
2 parents 43ece3f + 3eb2924 commit cf99e90

File tree

74 files changed

+2919
-1512
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+2919
-1512
lines changed

README.md

Lines changed: 133 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -311,10 +311,10 @@ from both streams and directly from other NATS producers.
311311
### The JetStream Context
312312
313313
After establishing a connection as described above, create a JetStream Context.
314-
315-
```java
316-
JetStream js = nc.jetStream();
317-
```
314+
315+
```java
316+
JetStream js = nc.jetStream();
317+
```
318318
319319
You can pass options to configure the JetStream client, although the defaults should
320320
suffice for most users. See the `JetStreamOptions` class.
@@ -331,13 +331,13 @@ before publishing. You can publish in either a synchronous or asynchronous manne
331331
**Synchronous:**
332332
333333
```java
334-
// create a typical NATS message
335-
Message msg = NatsMessage.builder()
336-
.subject("foo")
337-
.data("hello", StandardCharsets.UTF_8)
338-
.build();
339-
340-
PublishAck pa = js.publish(msg);
334+
// create a typical NATS message
335+
Message msg = NatsMessage.builder()
336+
.subject("foo")
337+
.data("hello", StandardCharsets.UTF_8)
338+
.build();
339+
340+
PublishAck pa = js.publish(msg);
341341
```
342342
343343
See `NatsJsPub.java` in the JetStream examples for a detailed and runnable example.
@@ -358,39 +358,38 @@ The PublishOptions are immutable, but the builder an be re-used for expectations
358358
For example:
359359
360360
```java
361-
PublishOptions.Builder pubOptsBuilder = PublishOptions.builder()
362-
.expectedStream("TEST")
363-
.messageId("mid1");
364-
PublishAck pa = js.publish("foo", null, pubOptsBuilder.build());
365-
366-
pubOptsBuilder.clearExpected()
367-
.setExpectedLastMsgId("mid1")
368-
.setExpectedLastSequence(1)
369-
.messageId("mid2");
370-
pa = js.publish("foo", null, pubOptsBuilder.build());
361+
PublishOptions.Builder pubOptsBuilder = PublishOptions.builder()
362+
.expectedStream("TEST")
363+
.messageId("mid1");
364+
PublishAck pa = js.publish("foo", null, pubOptsBuilder.build());
365+
366+
pubOptsBuilder.clearExpected()
367+
.setExpectedLastMsgId("mid1")
368+
.setExpectedLastSequence(1)
369+
.messageId("mid2");
370+
pa = js.publish("foo", null, pubOptsBuilder.build());
371371
```
372372
373373
See `NatsJsPubWithOptionsUseCases.java` in the JetStream examples for a detailed and runnable example.
374374
375375
**Asynchronous:**
376376
377377
```java
378+
List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
379+
for (int x = 1; x < roundCount; x++) {
380+
// create a typical NATS message
381+
Message msg = NatsMessage.builder()
382+
.subject("foo")
383+
.data("hello", StandardCharsets.UTF_8)
384+
.build();
385+
386+
// Publish a message
387+
futures.add(js.publishAsync(msg));
388+
}
378389
379-
List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
380-
for (int x = 1; x < roundCount; x++) {
381-
// create a typical NATS message
382-
Message msg = NatsMessage.builder()
383-
.subject("foo")
384-
.data("hello", StandardCharsets.UTF_8)
385-
.build();
386-
387-
// Publish a message
388-
futures.add(js.publishAsync(msg));
389-
}
390-
391-
for (CompletableFuture<PublishAck> future : futures) {
392-
... process the futures
393-
}
390+
for (CompletableFuture<PublishAck> future : futures) {
391+
... process the futures
392+
}
394393
```
395394
396395
See the `NatsJsPubAsync.java` in the JetStream examples for a detailed and runnable example.
@@ -412,20 +411,20 @@ Push subscriptions can be synchronous or asynchronous. The server *pushes* messa
412411
**Asynchronous:**
413412
414413
```java
415-
Dispatcher disp = ...;
416-
417-
MessageHandler handler = (msg) -> {
418-
// Process the message.
419-
// Ack the message depending on the ack model
420-
};
421-
422-
PushSubscribeOptions so = PushSubscribeOptions.builder()
423-
.durable("optional-durable-name")
424-
.build();
425-
426-
boolean autoAck = ...
427-
428-
js.subscribe("my-subject", disp, handler, autoAck);
414+
Dispatcher disp = ...;
415+
416+
MessageHandler handler = (msg) -> {
417+
// Process the message.
418+
// Ack the message depending on the ack model
419+
};
420+
421+
PushSubscribeOptions so = PushSubscribeOptions.builder()
422+
.durable("optional-durable-name")
423+
.build();
424+
425+
boolean autoAck = ...
426+
427+
js.subscribe("my-subject", disp, handler, autoAck);
429428
```
430429
431430
See the `NatsJsPushSubWithHandler.java` in the JetStream examples for a detailed and runnable example.
@@ -435,45 +434,74 @@ See the `NatsJsPushSubWithHandler.java` in the JetStream examples for a detailed
435434
See `NatsJsPushSub.java` in the JetStream examples for a detailed and runnable example.
436435
437436
```java
438-
PushSubscribeOptions so = PushSubscribeOptions.builder()
439-
.durable("optional-durable-name")
440-
.build();
437+
PushSubscribeOptions so = PushSubscribeOptions.builder()
438+
.durable("optional-durable-name")
439+
.build();
441440
442-
// Subscribe synchronously, then just wait for messages.
443-
JetStreamSubscription sub = js.subscribe("subject", so);
444-
nc.flush(Duration.ofSeconds(5));
441+
// Subscribe synchronously, then just wait for messages.
442+
JetStreamSubscription sub = js.subscribe("subject", so);
443+
nc.flush(Duration.ofSeconds(5));
445444
446-
Message msg = sub.nextMessage(Duration.ofSeconds(1));
445+
Message msg = sub.nextMessage(Duration.ofSeconds(1));
447446
```
448447
449-
### Pull Subscribing
448+
### Pull Subscriptions
450449
451450
Pull subscriptions are always synchronous. The server organizes messages into a batch
452451
which it sends when requested.
453452
454453
```java
455-
PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
456-
.durable("durable-name-is-required")
457-
.build();
458-
459-
JetStreamSubscription sub = js.subscribe("subject", pullOptions);
454+
PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
455+
.durable("durable-name-is-optional")
456+
.build();
457+
JetStreamSubscription sub = js.subscribe("subject", pullOptions);
460458
```
461459
460+
**Bind:**
461+
462+
Pull subscriptions allow for binding to existing consumers.
463+
The best practice is to provide `null` for the subscribe subject, but if you do
464+
provide it, it must match the consumer subject filter, or you will receive an
465+
`IllegalArgumentException`. See client errors below and `JsSubSubjectDoesNotMatchFilter 90011`
466+
467+
1. Short Form
468+
469+
```java
470+
PullSubscribeOptions pullOptions = PullSubscribeOptions.bind("stream", "durable-name");
471+
JetStreamSubscription sub = js.subscribe(null, pullOptions);
472+
```
473+
474+
2. Long Form
475+
476+
```java
477+
PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
478+
.stream("stream")
479+
.durable("durable-name")
480+
.bind(true)
481+
.build();
482+
```
483+
462484
**Fetch:**
463485
464486
```java
465-
List<Message> message = sub.fetch(100, Duration.ofSeconds(1));
466-
for (Message m : messages) {
467-
// process message
468-
m.ack();
469-
}
487+
List<Message> message = sub.fetch(100, Duration.ofSeconds(1));
488+
for (Message m : messages) {
489+
// process message
490+
m.ack();
491+
}
470492
```
471493
472-
The fetch pull is a *macro* pull that uses advanced pulls under the covers to return a list of messages.
473-
The list may be empty or contain at most the batch size. All status messages are handled for you.
494+
The fetch method is a *macro* pull that uses advanced pulls under the covers to return a list of messages.
495+
The list may be empty or contain at most the batch size.
496+
All status messages are handled for you except terminal status messages. See Pull Exception Handling below.
474497
The client can provide a timeout to wait for the first message in a batch.
475498
The fetch call returns when the batch is ready.
476-
The timeout may be exceeded if the server sent messages very near the end of the timeout period.
499+
If the timeout is exceeded while messages are in flight, but before they reach the client,
500+
those messages will be available via nextMessage or will be used to fulfill the next fetch.
501+
502+
One important thing to consider when using this is ack wait. Once the server sends a message,
503+
it's specific ack wait timer is started. If you ask for too many messages, you may fail to
504+
ack all messages in time and can get redeliveries.
477505

478506
See `NatsJsPullSubFetch.java` and `NatsJsPullSubFetchUseCases.java`
479507
in the JetStream examples for a detailed and runnable example.
@@ -489,67 +517,75 @@ in the JetStream examples for a detailed and runnable example.
489517
}
490518
```
491519

492-
The iterate pull is a *macro* pull that uses advanced pulls under the covers to return an iterator.
520+
The iterate method is a *macro* pull that uses advanced pulls under the covers to return an iterator.
493521
The iterator may have no messages up to at most the batch size.
494-
All status messages are handled for you.
522+
All status messages are handled for you except terminal status messages. See Pull Exception Handling below.
495523
The client can provide a timeout to wait for the first message in a batch.
496524
The iterate call returns the iterator immediately, but under the covers it will wait for the first
497525
message based on the timeout.
498-
The timeout may be exceeded if the server sent messages very near the end of the timeout period.
526+
527+
The iterate method is usually preferred to the fetch method as it allows you to start processing messages
528+
right away instead of waiting until the entire batch is filled. This reduces problems with ack wait
529+
and generally is more efficient.
499530

500531
See `NatsJsPullSubIterate.java` and `NatsJsPullSubIterateUseCases.java`
501532
in the JetStream examples for a detailed and runnable example.
502533

503534
**Batch Size:**
504535

505536
```java
506-
sub.pull(100);
507-
...
508-
Message m = sub.nextMessage(Duration.ofSeconds(1));
537+
sub.pull(100);
538+
...
539+
Message m = sub.nextMessage(Duration.ofSeconds(1));
509540
```
510541

511-
An advanced version of pull specifies a batch size. When asked, the server will send whatever
542+
This is an advanced / raw pull that specifies a batch size. When asked, the server will send whatever
512543
messages it has up to the batch size. If it has no messages it will wait until it has some to send.
513-
The client may time out before that time. If there are less than the batch size available,
514-
you can ask for more later. Once the entire batch size has been filled, you must make another pull request.
544+
The pull request only completes on the server once the entire batch has been sent.
545+
It's up to you to track this and only send pulls when the batch is complete, or you risk having
546+
pulls stack up and possibly receiving a status `409 Exceeded MaxWaiting` warning.
547+
The nextMessage request may time out but that does not indicate that there are no more messages in the pull.
548+
Instead, it indicates that there is no message available at that time.
549+
Once the entire batch size has been filled, you must make another pull request.
515550

516551
See `NatsJsPullSubBatchSize.java` and `NatsJsPullSubBatchSizeUseCases.java`
517552
in the JetStream examples for detailed and runnable example.
518553

519554
**No Wait and Batch Size:**
520555

521556
```java
522-
sub.pullNoWait(100);
523-
...
524-
Message m = sub.nextMessage(Duration.ofSeconds(1));
557+
sub.pullNoWait(100);
558+
...
559+
Message m = sub.nextMessage(Duration.ofSeconds(1));
525560
```
526561

527-
An advanced version of pull also specifies a batch size. When asked, the server will send whatever
528-
messages it has up to the batch size, but will never wait for the batch to fill and the client
529-
will return immediately. If there are less than the batch size available, you will get what is
530-
available and a 404 status message indicating the server did not have enough messages.
531-
You must make a pull request every time. **This is an advanced api**
562+
This is an advanced / raw pull that also specifies a batch size.
563+
When asked, the server will send whatever messages it has at the moment
564+
the pull request is processed by the server, up to the batch size.
565+
If there are less than the batch size available, you will get what is
566+
available.
567+
You must make a pull request every time.
532568

533569
See the `NatsJsPullSubNoWaitUseCases.java` in the JetStream examples for a detailed and runnable example.
534570

535571
**Expires In and Batch Size:**
536572

537573
```java
538-
sub.pullExpiresIn(100, Duration.ofSeconds(3));
539-
...
540-
Message m = sub.nextMessage(Duration.ofSeconds(4));
574+
sub.pullExpiresIn(100, Duration.ofSeconds(3));
575+
...
576+
Message m = sub.nextMessage(Duration.ofSeconds(4));
541577
```
542578

543579
Another advanced version of pull specifies a maximum time to wait for the batch to fill.
544-
The server returns messages when either the batch is filled or the time expires. It's important to
545-
set your client's timeout to be longer than the time you've asked the server to expire in.
546-
You must make a pull request every time. In subsequent pulls, you will receive multiple 408 status
547-
messages, one for each message the previous batch was short. You can just ignore these.
548-
**This is an advanced api**
580+
The server sends messages up until the batch is filled or the time expires. It's important to
581+
set your client's nextMessage timeout to be longer than the time you've asked the server to expire in.
582+
Once nextMessage returns null, you know your pull is done, and you can make another one.
549583

550584
See `NatsJsPullSubExpire.java` and `NatsJsPullSubExpireUseCases.java`
551585
in the JetStream examples for detailed and runnable examples.
552586

587+
**Pull Exception Handling:**
588+
553589
### Ordered Push Subscription Option
554590

555591
You can now set a Push Subscription option called "Ordered".
@@ -568,7 +604,7 @@ You can however set the deliver policy which will be used to start the subscript
568604

569605
### Client Error Messages
570606

571-
In addition to some generic validation messages for values in builders, there are also additional grouped and numbered client error messages:
607+
`In addition to some generic validation messages for values in builders, there are also additional grouped and numbered client error messages:
572608
* Subscription building and creation
573609
* Consumer creation
574610
* Object Store operations

src/examples/java/io/nats/examples/ExampleUtils.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,29 @@ public static String getServer(String[] args) {
3434

3535
public static final ErrorListener EXAMPLE_ERROR_LISTENER = new ErrorListenerLoggerImpl();
3636

37-
public static Options createExampleOptions(String[] args) throws Exception {
37+
public static Options createExampleOptions(String[] args) {
3838
String server = getServer(args);
3939
return createExampleOptions(server, false, EXAMPLE_ERROR_LISTENER, EXAMPLE_CONNECTION_LISTENER);
4040
}
4141

42-
public static Options createExampleOptions(String[] args, boolean allowReconnect) throws Exception {
42+
public static Options createExampleOptions(String[] args, boolean allowReconnect) {
4343
String server = getServer(args);
4444
return createExampleOptions(server, allowReconnect, EXAMPLE_ERROR_LISTENER, EXAMPLE_CONNECTION_LISTENER);
4545
}
4646

47-
public static Options createExampleOptions(String server) throws Exception {
47+
public static Options createExampleOptions(String server) {
4848
return createExampleOptions(server, false, EXAMPLE_ERROR_LISTENER, EXAMPLE_CONNECTION_LISTENER);
4949
}
5050

51-
public static Options createExampleOptions(String server, ErrorListener el) throws Exception {
51+
public static Options createExampleOptions(String server, ErrorListener el) {
5252
return createExampleOptions(server, false, el, EXAMPLE_CONNECTION_LISTENER);
5353
}
5454

55-
public static Options createExampleOptions(String server, boolean allowReconnect) throws Exception {
55+
public static Options createExampleOptions(String server, boolean allowReconnect) {
5656
return createExampleOptions(server, allowReconnect, EXAMPLE_ERROR_LISTENER, EXAMPLE_CONNECTION_LISTENER);
5757
}
5858

59-
public static Options createExampleOptions(String server, boolean allowReconnect, ErrorListener el, ConnectionListener cl) throws Exception {
59+
public static Options createExampleOptions(String server, boolean allowReconnect, ErrorListener el, ConnectionListener cl) {
6060

6161
if (el == null) { el = new ErrorListener() {}; }
6262

@@ -75,7 +75,13 @@ public static Options createExampleOptions(String server, boolean allowReconnect
7575
}
7676

7777
if (System.getenv("NATS_NKEY") != null && System.getenv("NATS_NKEY") != "") {
78-
AuthHandler handler = new ExampleAuthHandler(System.getenv("NATS_NKEY"));
78+
AuthHandler handler = null;
79+
try {
80+
handler = new ExampleAuthHandler(System.getenv("NATS_NKEY"));
81+
}
82+
catch (Exception e) {
83+
throw new RuntimeException(e);
84+
}
7985
builder.authHandler(handler);
8086
} else if (System.getenv("NATS_CREDS") != null && System.getenv("NATS_CREDS") != "") {
8187
builder.authHandler(Nats.credentials(System.getenv("NATS_CREDS")));

0 commit comments

Comments
 (0)