Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public partial class NatsConnection : INatsConnection
private CancellationTokenSource? _pingTimerCancellationTokenSource;
private volatile NatsUri? _currentConnectUri;
private volatile NatsUri? _lastSeedConnectUri;
private volatile NatsUri? _lastAttemptResolvedUri;
private NatsReadProtocolProcessor? _socketReader;
private TaskCompletionSource _waitForOpenConnection;
private UserCredentials? _userCredentials;
Expand Down Expand Up @@ -381,22 +382,33 @@ private async ValueTask InitialConnectAsync()
_userCredentials = new UserCredentials(Opts.AuthOpts);
}

var attemptedUris = new List<NatsUri>();

foreach (var uri in uris)
{
try
{
await ConnectSocketAsync(uri).ConfigureAwait(false);
var resolvedUri = await ConnectSocketAsync(uri).ConfigureAwait(false);
attemptedUris.Add(resolvedUri);
_currentConnectUri = resolvedUri; // set only after success
break;
}
catch (Exception ex)
{
_logger.LogError(NatsLogEvents.Connection, ex, "Fail to connect NATS {Url}", uri);
var failedResolvedUri = _lastAttemptResolvedUri ?? uri; // local cache from ConnectSocketAsync
attemptedUris.Add(failedResolvedUri);
_logger.LogError(NatsLogEvents.Connection, ex, "Fail to connect NATS {Url}", failedResolvedUri);
_currentConnectUri = null; // ensure cleared on failure
}
finally
{
_lastAttemptResolvedUri = null;
}
}

if (_socketConnection == null)
{
var exception = new NatsException("can not connect uris: " + string.Join(",", uris.Select(x => x.ToString())));
var exception = new NatsException("can not connect uris: " + string.Join(",", attemptedUris.Select(x => x.ToString())));
lock (_gate)
{
ConnectionState = NatsConnectionState.Closed; // allow retry connect
Expand Down Expand Up @@ -459,7 +471,7 @@ private async ValueTask InitialConnectAsync()
}
}

private async Task ConnectSocketAsync(NatsUri uri)
private async Task<NatsUri> ConnectSocketAsync(NatsUri uri)
{
var target = (uri.Host, uri.Port);
if (OnConnectingAsync != null)
Expand All @@ -468,10 +480,16 @@ private async Task ConnectSocketAsync(NatsUri uri)
target = await OnConnectingAsync(target).ConfigureAwait(false);
if (target.Host != uri.Host || target.Port != uri.Port)
{
uri = uri with { Uri = new UriBuilder(uri.Uri) { Host = target.Host, Port = target.Port, }.Uri };
var modifiedUri = new UriBuilder(uri.Uri) { Host = target.Host, Port = target.Port }.Uri;
var newUri = new NatsUri(modifiedUri.ToString(), uri.IsSeed, uri.Uri.Scheme);
_logger.LogDebug(NatsLogEvents.Connection, "OnConnectingAsync override: {Original} -> {Resolved}", uri.Uri, newUri.Uri);
uri = newUri;
}
}

// Keep for failure logging.
_lastAttemptResolvedUri = uri;

var connectionFactory = Opts.SocketConnectionFactory ?? (uri.IsWebSocket ? WebSocketFactory.Default : TcpFactory.Default);
_logger.LogInformation(NatsLogEvents.Connection, "Connect to NATS using {FactoryType} {Uri}", connectionFactory.GetType().Name, uri);
using var timeoutCts = new CancellationTokenSource(Opts.ConnectTimeout);
Expand Down Expand Up @@ -501,7 +519,8 @@ private async Task ConnectSocketAsync(NatsUri uri)
_socketConnection = _socketConnection with { InnerSocket = await OnSocketAvailableAsync(_socketConnection.InnerSocket).ConfigureAwait(false) };
}

_currentConnectUri = uri;
// Caller assigns _currentConnectUri.
return uri;
}

private async ValueTask SetupReaderWriterAsync(bool reconnect)
Expand Down
Loading