Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions csharp/Platform.Protocols/Udp/UdpReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Linq;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Platform.Disposables;
using Platform.Exceptions;
using Platform.Threading;
Expand All @@ -26,7 +28,8 @@ public class UdpReceiver : DisposableBase //-V3073
{
private const int DefaultPort = 15000;
private bool _receiverRunning;
private Thread _thread;
private Task? _receiverTask;
private CancellationTokenSource _cancellationTokenSource;
private readonly UdpClient _udp;
private readonly MessageHandlerCallback _messageHandler;

Expand Down Expand Up @@ -65,6 +68,7 @@ public UdpReceiver(int listenPort, bool autoStart, MessageHandlerCallback messag
{
_udp = new UdpClient(listenPort);
_messageHandler = messageHandler;
_cancellationTokenSource = new CancellationTokenSource();
if (autoStart)
{
Start();
Expand Down Expand Up @@ -119,11 +123,10 @@ public UdpReceiver() : this(DefaultPort, true, message => { }) { }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Start()
{
if (!_receiverRunning && _thread == null)
if (!_receiverRunning && _receiverTask == null)
{
_receiverRunning = true;
_thread = new Thread(Receiver);
_thread.Start();
_receiverTask = Task.Run(() => Receiver(_cancellationTokenSource.Token));
}
}

Expand All @@ -136,11 +139,19 @@ public void Start()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Stop()
{
if (_receiverRunning && _thread != null)
if (_receiverRunning && _receiverTask != null)
{
_receiverRunning = false;
_thread.Join();
_thread = null;
_cancellationTokenSource.Cancel();
try
{
_receiverTask.Wait();
}
catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException))
{
// Expected when cancellation is requested
}
_receiverTask = null;
}
}

Expand All @@ -166,9 +177,9 @@ public void Stop()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void ReceiveAndHandle() => _messageHandler(Receive());
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void Receiver()
private void Receiver(CancellationToken cancellationToken)
{
while (_receiverRunning)
while (_receiverRunning && !cancellationToken.IsCancellationRequested)
{
try
{
Expand Down Expand Up @@ -208,6 +219,7 @@ protected override void Dispose(bool manual, bool wasDisposed)
if (!wasDisposed)
{
Stop();
_cancellationTokenSource?.Dispose();
_udp.DisposeIfPossible();
}
}
Expand Down
Loading