diff --git a/sandbox/MicroBenchmark/ChannelPassingBenchmarks.cs b/sandbox/MicroBenchmark/ChannelPassingBenchmarks.cs new file mode 100644 index 000000000..e2230a87a --- /dev/null +++ b/sandbox/MicroBenchmark/ChannelPassingBenchmarks.cs @@ -0,0 +1,273 @@ +using System.Threading.Channels; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Jobs; +using NATS.Client.Core; +using NATS.Client.Core.Commands; +using NATS.Client.Core.Internal; + +namespace MicroBenchmark; + +[MemoryDiagnoser] +[SimpleJob(RuntimeMoniker.Net80)] +public class ChannelPassingBenchmarks +{ + private NatsConnection Connection; + private SubWrappedChannel _inFlightNatsMsgChannel; + private Channel> _natsMsgChannel; + private BoundedChannelOptions ChannelOpts; + private CancellationTokenSource _cts; + + [GlobalCleanup] + public void TearDown() + { + _cts.Dispose(); + _natsMsgChannel = null; + _inFlightNatsMsgChannel = null; + } + + [GlobalSetup] + public void Setup() + { + Connection = new NatsConnection(); + ChannelOpts = Connection.GetChannelOpts(Connection.Opts, default); + _inFlightNatsMsgChannel = new SubWrappedChannel( + Channel.CreateBounded>( + ChannelOpts), + Connection); + _natsMsgChannel = Channel.CreateBounded>( + ChannelOpts); + _cts = new CancellationTokenSource(); + } + + [Benchmark] + public void RunNatsMsgChannel_Sync() + { + var maxCount = ChannelOpts.Capacity; + + + for (int i = 0; i < maxCount; i++) + { + _natsMsgChannel.Writer.TryWrite(new NatsMsg("t", default, 3, default, "foo", default)); + } + + var reader = _natsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + reader.TryRead(out _); + } + } + + [Benchmark] + public void RunInFlightNatsMsgChannel_Sync() + { + var maxCount = ChannelOpts.Capacity; + + var writer = _inFlightNatsMsgChannel.Writer; + for (int i = 0; i < maxCount; i++) + { + writer.TryWrite(new InFlightNatsMsg("t", default, 3, default, "foo")); + } + + var reader = _inFlightNatsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + reader.TryRead(out _); + } + } + + [Benchmark] + public void RunNatsMsgChannel_Sync_Pulse() + { + var maxCount = ChannelOpts.Capacity; + + + var reader = _natsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + _natsMsgChannel.Writer.TryWrite(new NatsMsg("t", default, 3, default, "foo", default)); + reader.TryRead(out _); + } + } + + [Benchmark] + public void RunInFlightNatsMsgChannel_Sync_Pulse() + { + var maxCount = ChannelOpts.Capacity; + + var writer = _inFlightNatsMsgChannel.Writer; + var reader = _inFlightNatsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + writer.TryWrite(new InFlightNatsMsg("t", default, 3, default, "foo")); + reader.TryRead(out _); + } + } + + [Benchmark] + public async Task RunNatsMsgChannel_Async_Pulse() + { + var maxCount = ChannelOpts.Capacity; + + + var reader = _natsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + _natsMsgChannel.Writer.TryWrite(new NatsMsg("t", default, 3, default, "foo", default)); + await reader.ReadAsync(_cts.Token); + } + } + + [Benchmark] + public async Task RunInFlightNatsMsgChannel_Async_Pulse() + { + var maxCount = ChannelOpts.Capacity; + + var writer = _inFlightNatsMsgChannel.Writer; + var reader = _inFlightNatsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + writer.TryWrite(new InFlightNatsMsg("t", default, 3, default, "foo")); + await reader.ReadAsync(_cts.Token); + } + } + + [Benchmark] + public async Task RunNatsMsgChannel_Async_Pulse_Unhappy() + { + var maxCount = ChannelOpts.Capacity; + + var writer = _natsMsgChannel.Writer; + var reader = _natsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + var r = reader.ReadAsync(_cts.Token); + writer.TryWrite(new NatsMsg("t", default, 3, default, "foo", default)); + var v = await r; + if (v.Subject == null) + { + Console.WriteLine("wat"); + } + } + } + + [Benchmark] + public async Task RunInFlightNatsMsgChannel_Async_Pulse_Unhappy() + { + var maxCount = ChannelOpts.Capacity; + + var writer = _inFlightNatsMsgChannel.Writer; + var reader = _inFlightNatsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + var r = reader.ReadAsync(_cts.Token); + writer.TryWrite(new InFlightNatsMsg("t", default, 3, default, "foo")); + var v = await r; + if (v.Subject == null) + { + Console.WriteLine("wat"); + } + } + } + + [Benchmark] + public async Task RunNatsMsgChannel_Async_Unhappy() + { + var maxCount = ChannelOpts.Capacity; + + var readTask = Task.Run( + async () => + { + var reader = _natsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + await reader.ReadAsync(_cts.Token); + } + }); +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run( +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + () => + { + for (int i = 0; i < maxCount; i++) + { + _natsMsgChannel.Writer.TryWrite(new NatsMsg("t", default, 3, default, "foo", default)); + } + }); + await readTask; + } + + [Benchmark] + public async Task RunInFlightNatsMsgChannel_Async_Unhappy() + { + var maxCount = ChannelOpts.Capacity; + + var readTask = Task.Run( + async () => + { + var reader = _inFlightNatsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + await reader.ReadAsync(_cts.Token); + } + }); + +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run( +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + () => + { + var writer = _inFlightNatsMsgChannel.Writer; + for (int i = 0; i < maxCount; i++) + { + writer.TryWrite(new InFlightNatsMsg("t", default, 3, default, "foo")); + } + }); + await readTask; + } + + [Benchmark] + public async Task RunNatsMsgChannel_Async_Happy() + { + var maxCount = ChannelOpts.Capacity; + + for (int i = 0; i < maxCount; i++) + { + _natsMsgChannel.Writer.TryWrite(new NatsMsg("t", default, 3, default, "foo", default)); + } + + var readTask = Task.Run( + async () => + { + var reader = _natsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + await reader.ReadAsync(_cts.Token); + } + }); + + await readTask; + } + + [Benchmark] + public async Task RunInFlightNatsMsgChannel_Async_Happy() + { + var maxCount = ChannelOpts.Capacity; + + var writer = _inFlightNatsMsgChannel.Writer; + for (int i = 0; i < maxCount; i++) + { + writer.TryWrite(new InFlightNatsMsg("t", default, 3, default, "foo")); + } + + var readTask = Task.Run( + async () => + { + var reader = _inFlightNatsMsgChannel.Reader; + for (int i = 0; i < maxCount; i++) + { + await reader.ReadAsync(_cts.Token); + } + }); + await readTask; + } +} diff --git a/src/NATS.Client.Core/Commands/PooledValueTaskSource.cs b/src/NATS.Client.Core/Commands/PooledValueTaskSource.cs new file mode 100644 index 000000000..6a0115c74 --- /dev/null +++ b/src/NATS.Client.Core/Commands/PooledValueTaskSource.cs @@ -0,0 +1,234 @@ +using System.Collections.Concurrent; +using System.Linq.Expressions; +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using System.Threading.Tasks.Sources; +using NATS.Client.Core.Internal; + +namespace NATS.Client.Core.Commands; + + +public struct CheatingPeeker +{ + internal readonly object? _obj; + /// The result to be used if the operation completed successfully synchronously. + internal readonly TResult? _result; + /// Opaque value passed through to the . + internal readonly short _token; +} +internal static class PVTSSentinels +{ + +} +public sealed class PooledValueTaskSource : + IValueTaskSource> +{ + private INatsConnection? _connection; + private ManualResetValueTaskSourceCore> _core; + private ValueTask> _source; + private int _continuationFlags; + private SubWrappedChannelReader? _borrowedFor; + + internal PooledValueTaskSource() + { + } + + internal ValueTask> ToValueTaskAsync(ValueTask> msg, INatsConnection? connection) + { + _connection = connection; + _source = msg; + + return new ValueTask>(this, _core.Version); + } + + private void InternalOnCompleted() + { + try + { +#pragma warning disable VSTHRD002 + _continuationFlags = _continuationFlags | 3; + _core.SetResult(_source.GetAwaiter().GetResult().ToNatsMsg(_connection)); +#pragma warning restore VSTHRD002 + } + catch (Exception e) + { + SetException(e); + } + } + + public void SetException(Exception exception) + { + if (_continuationFlags > 3) + return; + _continuationFlags = _continuationFlags | 8; + ThreadPool.UnsafeQueueUserWorkItem( + state => + { + state.self._core.SetException(state.exception); + }, + (self: this, exception), + preferLocal: false); + } + + NatsMsg IValueTaskSource>.GetResult(short token) + { + if (_core.Version == token && (_continuationFlags & 4) != 0) + { + var peek = Unsafe.As>, CheatingPeeker>>(ref _source); + if (peek._obj is IValueTaskSource> result) + { + var res = result.GetResult(peek._token).ToNatsMsg(_connection); + ResetAndTryReturn(); + return res; + } + else if (peek._obj == null) + { + var res = peek._result.ToNatsMsg(_connection); + ResetAndTryReturn(); + return res; + } + else + { + var res = ((Task>)peek._obj).Result.ToNatsMsg(_connection); + ResetAndTryReturn(); + return res; + } + + //var res = _source.Result.ToNatsMsg(_connection); + //ResetAndTryReturn(); + //return res; + } + else + { + return getResultSlow(token); + } + } + + private NatsMsg getResultSlow(short token) + { + try + { + if ((_continuationFlags & 1) == 0) + { + if (GetStatus(token) == ValueTaskSourceStatus.Succeeded && + _core.GetStatus(token) == ValueTaskSourceStatus.Pending) + { + return _source.Result.ToNatsMsg(_connection); + } + } + + return _core.GetResult(token); + } + finally + { + ResetAndTryReturn(); + } + } + + private void ResetAndTryReturn() + { + _core.Reset(); + _source = default; + //var p = _objectPool; + //_objectPool = null; + // canceled object don't return pool to avoid call SetResult/Exception after await + //if (p != null && !_noReturn) + if (_continuationFlags < 7) + { + //var b = _borrowedFor; + _continuationFlags = 0; + if (_borrowedFor is { _internalPooledSource: null }) + { + _borrowedFor._internalPooledSource = this; + //var other = Interlocked.Exchange(ref _borrowedFor._internalPooledSource!, this); + } + else + { + _borrowedFor = null; + if (_pool.Count < 128) + { + _pool.Enqueue(this); + } + } + } + } + + public ValueTaskSourceStatus GetStatus(short token) + { + //var status = _core.GetStatus(token); + if ((_continuationFlags & 4) != 0) + { + return ValueTaskSourceStatus.Succeeded; + } + else if (_source.IsCompletedSuccessfully && _continuationFlags == 0)// && status == ValueTaskSourceStatus.Pending) + { +#pragma warning disable VSTHRD002 + _continuationFlags = 4; + //_core.SetResult(_source.GetAwaiter().GetResult().ToNatsMsg(_connection)); +#pragma warning restore VSTHRD002 + return ValueTaskSourceStatus.Succeeded; + } + else if (Interlocked.CompareExchange(ref _continuationFlags, 1, 0) == 0) + { + return QueueContinuation(); + } + else + { + return _core.GetStatus(token); + } + } + + private ValueTaskSourceStatus QueueContinuation() + { + var peeker = + Unsafe + .As>, + CheatingPeeker>>(ref _source); + if (peeker._obj is IValueTaskSource> vts) + { + vts.OnCompleted( + static o => + { + var me = o as PooledValueTaskSource; + me!.InternalOnCompleted(); + }, + this, + peeker._token, + ValueTaskSourceOnCompletedFlags.UseSchedulingContext); + } + else + { + _source.GetAwaiter().OnCompleted(InternalOnCompleted); + } + + return ValueTaskSourceStatus.Pending; + } + + void IValueTaskSource>.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + { + _core.OnCompleted(continuation, state, token, flags); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static PooledValueTaskSource RentOrGet(SubWrappedChannelReader borrower) + { + //var self = Interlocked.Exchange(ref borrower._internalPooledSource, null); + if (_pool.TryDequeue(out var self) == false) + { + self = new PooledValueTaskSource(); + } + + self._borrowedFor = borrower; + //if (pool == null || pool.TryRent>(out var self) == false) + //{ + // self = new PooledValueTaskSource(); + //} + + //self._objectPool = pool; + + return self; + } + + private static readonly ConcurrentQueue> _pool = new ConcurrentQueue>(); +} diff --git a/src/NATS.Client.Core/Internal/InFlightNatsMsg.cs b/src/NATS.Client.Core/Internal/InFlightNatsMsg.cs new file mode 100644 index 000000000..364b612b8 --- /dev/null +++ b/src/NATS.Client.Core/Internal/InFlightNatsMsg.cs @@ -0,0 +1,134 @@ +using System.Buffers; +using System.Runtime.CompilerServices; + +namespace NATS.Client.Core.Internal; + +public readonly struct InFlightNatsMsg +{ + public static bool operator ==(InFlightNatsMsg left, InFlightNatsMsg right) => left.Equals(right); + + public static bool operator !=(InFlightNatsMsg left, InFlightNatsMsg right) => !left.Equals(right); + + public bool Equals(InFlightNatsMsg other) => + Subject == other.Subject + && Equals(_otherData, other._otherData) + && EqualityComparer.Default.Equals(Data, other.Data) + && Size == other.Size; + + public override bool Equals(object? obj) => obj is InFlightNatsMsg other && Equals(other); + + + public override int GetHashCode() + { + unchecked + { + var hashCode = Subject.GetHashCode(); + hashCode = (hashCode * 397) ^ (_otherData != null ? _otherData.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ EqualityComparer.Default.GetHashCode(Data); + hashCode = (hashCode * 397) ^ Size; + return hashCode; + } + } + + public readonly string Subject; + private readonly object? _otherData; + public readonly T? Data; + public readonly int Size; + private readonly int _headerType; + + public InFlightNatsMsg(string subject, string? replyTo, int size, NatsHeaders? headers, T? data) + { + Subject = subject; + if (headers == null) + { + _otherData = replyTo; + _headerType = replyTo == null ? 1 : 0; + } + else + { + headers.ReplyTo = replyTo; + _otherData = headers; + _headerType = 2; + } + + Data = data; + Size = size; + } + + public InFlightNatsMsg(string subject, int size, object? otherData, T? data, int dataType) + { + Subject = subject; + Size = size; + _otherData = otherData; + _headerType = dataType; + Data = data; + } + + public string? ReplyTo => _headerType == 2 ? (_otherData as NatsHeaders)?.ReplyTo : _otherData as string; + + public NatsHeaders? Headers => _otherData as NatsHeaders; + + internal static InFlightNatsMsg BuildInternal( + string subject, + string? replyTo, + in ReadOnlySequence? headersBuffer, + in ReadOnlySequence payloadBuffer, + INatsConnection? connection, + NatsHeaderParser headerParser, + INatsDeserialize serializer) + { + // Consider an empty payload as null or default value for value types. This way we are able to + // receive sentinels as nulls or default values. This might cause an issue with where we are not + // able to differentiate between an empty sentinel and actual default value of a struct e.g. 0 (zero). + T? data = default; + int size = subject.Length; + if (payloadBuffer.Length > 0) + { + size = size + (int)payloadBuffer.Length; + data = serializer.Deserialize(payloadBuffer); + } + + NatsHeaders? headers = null; + + int _dataType = 0; + if (headersBuffer != null) + { + headers = new NatsHeaders(); + if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) + { + throw new NatsException("Error parsing headers"); + } + + size = size + (int)headersBuffer.Value.Length; + headers.SetReadOnly(); + _dataType = 2; + } + + if (replyTo != null) + { + size = size + replyTo.Length; + _dataType = _dataType + 1; + } + + //return new InFlightNatsMsg(subject, replyTo, (int) size, headers, data); + return new InFlightNatsMsg(subject, size, headers, data, _dataType); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public NatsMsg ToNatsMsg(INatsConnection? connection) + { + switch (_headerType) + { + case 0: + return new NatsMsg(Subject, null, Size, null, Data, connection); + case 1: + return new NatsMsg(Subject, _otherData as string, Size, null, Data, connection); + case 2: + var h = _otherData as NatsHeaders; + return new NatsMsg(Subject, h.ReplyTo, Size, h, Data, connection); + default: + return new NatsMsg(Subject, _otherData as string, Size, null, Data, connection); + + } + } +} diff --git a/src/NATS.Client.Core/Internal/SubWrappedChannel.cs b/src/NATS.Client.Core/Internal/SubWrappedChannel.cs new file mode 100644 index 000000000..bbe9a9c8f --- /dev/null +++ b/src/NATS.Client.Core/Internal/SubWrappedChannel.cs @@ -0,0 +1,13 @@ +using System.Threading.Channels; + +namespace NATS.Client.Core.Internal; + +internal sealed class SubWrappedChannel : Channel, NatsMsg> +{ + public SubWrappedChannel(Channel> channel, INatsConnection connection) + { + SubWrappedChannelReader readChannel = new(channel, connection); + Writer = channel.Writer; + Reader = readChannel; + } +} diff --git a/src/NATS.Client.Core/Internal/SubWrappedChannelReader.cs b/src/NATS.Client.Core/Internal/SubWrappedChannelReader.cs new file mode 100644 index 000000000..3e658b1fd --- /dev/null +++ b/src/NATS.Client.Core/Internal/SubWrappedChannelReader.cs @@ -0,0 +1,76 @@ +using System.Threading.Channels; +using NATS.Client.Core.Commands; + +namespace NATS.Client.Core.Internal; + +public sealed class SubWrappedChannelReader : ChannelReader> +{ + private readonly ChannelReader> _channel; + private readonly INatsConnection? _connection; + internal PooledValueTaskSource? _internalPooledSource; + + + internal SubWrappedChannelReader(ChannelReader> channel, INatsConnection? connection) + { + _channel = channel; + _connection = connection; + } + + public override bool CanPeek => base.CanPeek; + + public override Task Completion => _channel.Completion; + + public override bool CanCount => _channel.CanCount; + + public override int Count => _channel.Count; + + public override ValueTask> ReadAsync(CancellationToken cancellationToken = default) + { + var read = _channel.ReadAsync(cancellationToken); + if (read.IsCompletedSuccessfully) + { + return new ValueTask>(read.GetAwaiter().GetResult().ToNatsMsg(_connection)); + } + else + { + return doReadSlow(read); + } + } + + private ValueTask> doReadSlow(ValueTask> read) + { + PooledValueTaskSource? pvts = null; + if ((pvts = Interlocked.Exchange(ref _internalPooledSource, null)) == null) + { + pvts = PooledValueTaskSource.RentOrGet(this); + } + + return pvts.ToValueTaskAsync(read, _connection); + } + + public override ValueTask WaitToReadAsync(CancellationToken cancellationToken = default) => _channel.WaitToReadAsync(cancellationToken); + + public override bool TryRead(out NatsMsg item) + { + if (_channel.TryRead(out var inFlight)) + { + item = inFlight.ToNatsMsg(_connection); + return true; + } + + item = default; + return false; + } + + public override bool TryPeek(out NatsMsg item) + { + if (_channel.TryPeek(out var inFlight)) + { + item = inFlight.ToNatsMsg(_connection); + return true; + } + + item = default; + return false; + } +} diff --git a/src/NATS.Client.Core/NatsHeaders.cs b/src/NATS.Client.Core/NatsHeaders.cs index e63ff0de3..f8d43fc23 100644 --- a/src/NATS.Client.Core/NatsHeaders.cs +++ b/src/NATS.Client.Core/NatsHeaders.cs @@ -1,3 +1,4 @@ +using System.Buffers; using System.Collections; using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.Primitives; @@ -72,6 +73,8 @@ public enum Messages public Messages Message { get; internal set; } = Messages.Text; + internal string? ReplyTo { get; set; } + /// /// Initializes a new instance of . /// diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 53bc01750..1c7f318e3 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; +using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -120,42 +121,7 @@ public readonly record struct NatsMsg( T? Data, INatsConnection? Connection) : INatsMsg { - internal static NatsMsg Build( - string subject, - string? replyTo, - in ReadOnlySequence? headersBuffer, - in ReadOnlySequence payloadBuffer, - INatsConnection? connection, - NatsHeaderParser headerParser, - INatsDeserialize serializer) - { - // Consider an empty payload as null or default value for value types. This way we are able to - // receive sentinels as nulls or default values. This might cause an issue with where we are not - // able to differentiate between an empty sentinel and actual default value of a struct e.g. 0 (zero). - var data = payloadBuffer.Length > 0 - ? serializer.Deserialize(payloadBuffer) - : default; - - NatsHeaders? headers = null; - - if (headersBuffer != null) - { - headers = new NatsHeaders(); - if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) - { - throw new NatsException("Error parsing headers"); - } - - headers.SetReadOnly(); - } - - var size = subject.Length - + (replyTo?.Length ?? 0) - + (headersBuffer?.Length ?? 0) - + payloadBuffer.Length; - - return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); - } + public bool IsNoRespondersError => Headers?.Code == 503; /// /// Reply with an empty message. diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index b10439411..c70d72858 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -7,7 +7,7 @@ namespace NATS.Client.Core; public sealed class NatsSub : NatsSubBase, INatsSub { - private readonly Channel> _msgs; + private readonly SubWrappedChannel _msgs; internal NatsSub( NatsConnection connection, @@ -19,9 +19,11 @@ internal NatsSub( CancellationToken cancellationToken = default) : base(connection, manager, subject, queueGroup, opts, cancellationToken) { - _msgs = Channel.CreateBounded>( - connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts), - msg => Connection.MessageDropped(this, _msgs?.Reader.Count ?? 0, msg)); + _msgs = new SubWrappedChannel( + Channel.CreateBounded>( + connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts), + msg => Connection.MessageDropped(this, _msgs?.Reader.Count ?? 0, msg.ToNatsMsg(Connection))), + connection); Serializer = serializer; } @@ -32,7 +34,7 @@ internal NatsSub( protected override async ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { - var natsMsg = NatsMsg.Build( + var natsMsg = InFlightNatsMsg.BuildInternal( subject, replyTo, headersBuffer, diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 14f660433..359e55df2 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; +using NATS.Client.Core.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Internal; @@ -327,7 +328,7 @@ protected override async ValueTask ReceiveInternalAsync( else { var msg = new NatsJSMsg( - NatsMsg.Build( + InFlightNatsMsg.BuildInternal( subject, replyTo, headersBuffer, diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index 9804ef64e..87abf1558 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; +using NATS.Client.Core.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Internal; @@ -228,7 +229,7 @@ protected override async ValueTask ReceiveInternalAsync( else { var msg = new NatsJSMsg( - NatsMsg.Build( + InFlightNatsMsg.BuildInternal( subject, replyTo, headersBuffer, diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs index 48b500bec..312bd35ab 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; +using NATS.Client.Core.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Internal; @@ -272,7 +273,7 @@ protected override async ValueTask ReceiveInternalAsync( else { var msg = new NatsJSMsg( - NatsMsg.Build( + InFlightNatsMsg.BuildInternal( subject, replyTo, headersBuffer, diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 48401196a..7b78af591 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -438,7 +438,7 @@ protected override async ValueTask ReceiveInternalAsync( ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { - var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); + var msg = new NatsJSMsg(InFlightNatsMsg.BuildInternal(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); await _commands.WriteAsync(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false); } diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 9358f1228..bb3d693ee 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -2,6 +2,7 @@ using System.Diagnostics.CodeAnalysis; using System.Text; using NATS.Client.Core; +using NATS.Client.Core.Internal; using NATS.Client.JetStream.Internal; namespace NATS.Client.JetStream; @@ -130,10 +131,16 @@ public interface INatsJSMsg public readonly struct NatsJSMsg : INatsJSMsg { private readonly NatsJSContext _context; - private readonly NatsMsg _msg; + private readonly InFlightNatsMsg _msg; private readonly Lazy _replyToDateTimeAndSeq; public NatsJSMsg(NatsMsg msg, NatsJSContext context) + : this( + new InFlightNatsMsg(msg.Subject, msg.ReplyTo, msg.Size, msg.Headers, msg.Data), context) + { + } + + internal NatsJSMsg(InFlightNatsMsg msg, NatsJSContext context) { _msg = msg; _context = context; @@ -169,7 +176,7 @@ public NatsJSMsg(NatsMsg msg, NatsJSContext context) /// /// The connection messages was delivered on. /// - public INatsConnection? Connection => _msg.Connection; + public INatsConnection? Connection => _context.Connection; /// /// Additional metadata about the message. @@ -187,7 +194,7 @@ public NatsJSMsg(NatsMsg msg, NatsJSContext context) /// A used to cancel the command. /// A that represents the asynchronous send operation. public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) => - _msg.ReplyAsync(headers, replyTo, opts, cancellationToken); + _msg.ToNatsMsg(_context.Connection).ReplyAsync(headers, replyTo, opts, cancellationToken); /// /// Acknowledges the message was completely handled. @@ -251,7 +258,7 @@ private async ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opt { CheckPreconditions(); - if (_msg == default) + if (_msg.Subject == null) throw new NatsJSException("No user message, can't acknowledge"); if ((opts.DoubleAck ?? _context.Opts.AckOpts.DoubleAck) == true) @@ -265,7 +272,7 @@ private async ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opt } else { - await _msg.ReplyAsync( + await _msg.ToNatsMsg(_context.Connection).ReplyAsync( data: payload, opts: new NatsPubOpts { diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index faf15fda8..2f38387df 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -1,6 +1,7 @@ using System.Buffers; using System.Threading.Channels; using NATS.Client.Core; +using NATS.Client.Core.Internal; using NATS.Client.JetStream; namespace NATS.Client.KeyValueStore.Internal; @@ -54,7 +55,7 @@ protected override async ValueTask ReceiveInternalAsync( ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { - var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); + var msg = new NatsJSMsg(InFlightNatsMsg.BuildInternal(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); await _commands.WriteAsync(new NatsKVWatchCommandMsg { Command = NatsKVWatchCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false); } diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs index b13a722ff..aca8df72d 100644 --- a/src/NATS.Client.Services/NatsSvcEndPoint.cs +++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs @@ -190,7 +190,7 @@ protected override ValueTask ReceiveInternalAsync( Exception? exception; try { - msg = NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _nats.HeaderParser, _serializer); + msg = InFlightNatsMsg.BuildInternal(subject, replyTo, headersBuffer, payloadBuffer, _nats, _nats.HeaderParser, _serializer).ToNatsMsg(_nats); exception = null; } catch (Exception e)