diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index f49030732..f10d8fdaa 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -72,12 +72,12 @@ public void Monitoring() int port = rep.BindRandomPort("tcp://127.0.0.1"); reqMonitor.Connected += (s, e) => connectedEvent.Set(); + req.Connect("tcp://127.0.0.1:" + port); reqMonitor.AttachToPoller(poller); poller.RunAsync(); - req.Connect("tcp://127.0.0.1:" + port); req.SendFrame("a"); rep.SkipFrame(); @@ -347,6 +347,8 @@ public void RemoveSocket() // identity e.Socket.SkipFrame(); + //**Note: bad to assert from worker thread! + // If it fails, the test will crash, not report failure! Assert.Equal("Hello", e.Socket.ReceiveFrameString(out bool more)); Assert.False(more); @@ -400,6 +402,151 @@ public void RemoveSocket() } } + [Fact] + public async Task RemoveAndDisposeSocketAsync() + { + //set up poller, start it + var patient = new NetMQPoller(); + patient.RunAsync(); + + Assert.True(patient.IsRunning); + + //create a pub-sub pair + var port = 55667; + var conn = $"tcp://127.0.0.1:{port}"; + + var pub = new PublisherSocket(); + pub.Bind(conn); + + var sub = new SubscriberSocket(); + sub.Connect(conn); + sub.SubscribeToAnyTopic(); + + //handle callbacks from poller thread + sub.ReceiveReady += (s, e) => + { + var msg = e.Socket.ReceiveFrameString(); + + Debug.WriteLine($"sub has data: {msg}"); + }; + + //add the subscriber socket to poller + patient.Add(sub); + + //set up pub on separate thread + var canceller = new CancellationTokenSource(); + + var pubAction = new Action(async () => + { + var token = canceller.Token; + + uint i = 0; + + while (!token.IsCancellationRequested) + { + pub.SendFrame($"Hello-{++i}"); + + // send ~ 5Hz + await Task.Delay(200); + } + }); + + Task.Run(pubAction); + + //allow a little time to run + await Task.Delay(2000); + + //now try to remove the sub from poller + await patient.RemoveAndDisposeAsync(sub); + + Assert.True(sub.IsDisposed); + + //allow for poller to continue running + await Task.Delay(2000); + + patient.Stop(); + Assert.False(patient.IsRunning); + + canceller.Cancel(); + + pub?.Dispose(); + patient?.Dispose(); + } + + [Fact] + public async Task DisposeSocketAfterAsyncRemoval() + { + //set up poller, start it + var patient = new NetMQPoller(); + patient.RunAsync(); + + Assert.True(patient.IsRunning); + + //create a pub-sub pair + var port = 55667; + var conn = $"tcp://127.0.0.1:{port}"; + + var pub = new PublisherSocket(); + pub.Bind(conn); + + var sub = new SubscriberSocket(); + sub.Connect(conn); + sub.SubscribeToAnyTopic(); + + //handle callbacks from poller thread + sub.ReceiveReady += (s, e) => + { + var msg = e.Socket.ReceiveFrameString(); + + Debug.WriteLine($"sub has data: {msg}"); + }; + + //add the subscriber socket to poller + patient.Add(sub); + + //set up pub on separate thread + var canceller = new CancellationTokenSource(); + + var pubAction = new Action(async () => + { + var token = canceller.Token; + + uint i = 0; + + while(!token.IsCancellationRequested) + { + pub.SendFrame($"Hello-{++i}"); + + // send ~ 5Hz + await Task.Delay(200); + } + }); + + var pubThread = Task.Run(pubAction); + + //allow a little time to run + await Task.Delay(2000); + + //now try to remove the sub from poller + await patient.RemoveAsync(sub); + + // dispose the sub (this will cause exception on poller's worker-thread) and it can't be caught! + sub.Dispose(); + sub = null; + + //allow for poller to continue running + await Task.Delay(2000); + + patient.Stop(); + Assert.False(patient.IsRunning); + + canceller.Cancel(); + + pub?.Dispose(); + patient?.Dispose(); + canceller?.Dispose(); + } + [Fact] public void AddThrowsIfSocketAlreadyDisposed() { @@ -453,6 +600,7 @@ public void DisposeThrowsIfSocketAlreadyDisposed() // Dispose the socket. // It is incorrect to have a disposed socket in a poller. // Disposed sockets can throw into the poller's thread. + socket.Dispose(); // Dispose throws if a polled socket is disposed @@ -562,6 +710,61 @@ public void RemoveTimer() } } + [Fact] + public async void RemoveTimer_Async() + { + using (var router = new RouterSocket()) + using (var dealer = new DealerSocket()) + using (var poller = new NetMQPoller { router }) + { + int port = router.BindRandomPort("tcp://127.0.0.1"); + + dealer.Connect("tcp://127.0.0.1:" + port); + + bool timerTriggered = false; + + var timer = new NetMQTimer(TimeSpan.FromMilliseconds(500)); + timer.Elapsed += (s, a) => { timerTriggered = true; }; + + // The timer will fire after 100ms + poller.Add(timer); + + bool messageArrived = false; + + router.ReceiveReady += (s, e) => + { + router.SkipFrame(); + router.SkipFrame(); + messageArrived = true; + //// Remove timer + //poller.Remove(timer); + }; + + poller.RunAsync(); + + Thread.Sleep(20); + + dealer.SendFrame("hello"); + + Thread.Sleep(300); + + try + { + await poller.RemoveAsync(timer); + } + catch (Exception ex) + { + //ensure no exceptions thrown + Assert.Null(ex); + } + + poller.Stop(); + + Assert.True(messageArrived); + Assert.False(timerTriggered); + } + } + [Fact] public void RunMultipleTimes() { @@ -859,6 +1062,60 @@ public void NativeSocket() } } + [Fact] + public async void NativeSocket_RemoveAsync() + { + using (var streamServer = new StreamSocket()) + using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + { + int port = streamServer.BindRandomPort("tcp://*"); + + socket.Connect(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port)); + + var buffer = new byte[] { 1 }; + socket.Send(buffer); + + byte[] identity = streamServer.ReceiveFrameBytes(); + byte[] message = streamServer.ReceiveFrameBytes(); + + Assert.Equal(buffer[0], message[0]); + + var socketSignal = new ManualResetEvent(false); + + using (var poller = new NetMQPoller()) + { + poller.Add(socket, s => + { + socket.Receive(buffer); + + socketSignal.Set(); + }); + + poller.RunAsync(); + + // no message is waiting for the socket so it should fail + Assert.False(socketSignal.WaitOne(100)); + + // sending a message back to the socket + streamServer.SendMoreFrame(identity).SendFrame("a"); + + Assert.True(socketSignal.WaitOne(100)); + + socketSignal.Reset(); + + await poller.RemoveAsync(socket); + + // sending a message back to the socket + streamServer.SendMoreFrame(identity).SendFrame("a"); + + // we remove the native socket so it should fail + Assert.False(socketSignal.WaitOne(100)); + + poller.Stop(); + } + } + } + #endregion #region TaskScheduler tests diff --git a/src/NetMQ/ISocketPollableCollection.cs b/src/NetMQ/ISocketPollableCollection.cs index a4654a51f..fc1bbc7a8 100644 --- a/src/NetMQ/ISocketPollableCollection.cs +++ b/src/NetMQ/ISocketPollableCollection.cs @@ -10,7 +10,7 @@ namespace NetMQ /// /// This interface provides an abstraction over the legacy Poller and newer classes for use in . /// - [Obsolete("Use INetMQPoller instead")] + [Obsolete("Use INetMQPoller instead. This should be made internal, to avoid major re-work of NetMQMonitor, but prevent accidental mis-use from applications")] public interface ISocketPollableCollection { /// diff --git a/src/NetMQ/ISocketPollableCollectionAsync.cs b/src/NetMQ/ISocketPollableCollectionAsync.cs new file mode 100644 index 000000000..004613e75 --- /dev/null +++ b/src/NetMQ/ISocketPollableCollectionAsync.cs @@ -0,0 +1,18 @@ +using System; +using System.Threading.Tasks; +using JetBrains.Annotations; + +namespace NetMQ +{ + /// + /// + /// + /// + /// This interface provides an abstraction over the legacy Poller and newer classes for use in and avoids thread sync issues removing sockets. + /// + public interface ISocketPollableCollectionAsync + { + Task RemoveAsync([NotNull] ISocketPollable socket); + Task RemoveAndDisposeAsync(T socket) where T : ISocketPollable, IDisposable; + } +} diff --git a/src/NetMQ/NetMQ.csproj b/src/NetMQ/NetMQ.csproj index 77c8a6244..e46eb06f6 100644 --- a/src/NetMQ/NetMQ.csproj +++ b/src/NetMQ/NetMQ.csproj @@ -57,4 +57,43 @@ + + + + + + + + + + 1.0.168 + + + + + + 1.0.168 + + + + + + 1.0.168 + + + diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index b5be7187e..6dae52b1f 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -29,7 +29,7 @@ public sealed class NetMQPoller : ISynchronizeInvoke, #endif #pragma warning disable 618 - INetMQPoller, ISocketPollableCollection, IEnumerable, IDisposable + INetMQPoller, ISocketPollableCollection, ISocketPollableCollectionAsync, IEnumerable, IDisposable #pragma warning restore 618 { private readonly List m_sockets = new List(); @@ -120,10 +120,47 @@ protected override void QueueTask(Task task) m_tasksQueue.Enqueue(task); } + internal Task RunAsync([NotNull] Action action) + { + Task t; + + if (!IsRunning || CanExecuteTaskInline) + { + action(); + + t = FromResult(null); + } + else + { + t = new Task(action); + t.Start(this); + } + + return t; + } + + internal Task RunAsync([NotNull] Func action) + { + Task t; + + if (!IsRunning || CanExecuteTaskInline) + { + t = FromResult(action()); + } + else + { + t = new Task(action); + t.Start(this); + } + + return t; + } + /// /// Run an action on the Poller thread /// /// The action to run + [Obsolete("Queues the action on the poller's thread, but provides no sync mechanism. Please use RemoveAsync() to remove sockets or timers")] public void Run([NotNull] Action action) { if (!IsRunning || CanExecuteTaskInline) @@ -131,8 +168,27 @@ public void Run([NotNull] Action action) else new Task(action).Start(this); } -#else - private void Run(Action action) + + /// + /// Provides a completed task with the result for a syncronously run action. + /// this provides a shim for 4.0 and 4.5 compatibility. + /// + /// + /// + /// + private static Task FromResult(TResult result) + { +#if NET40 + var tcs = new TaskCompletionSource(); + tcs.SetResult(result); + return tcs.Task; +#else //NET45+ + return Task.FromResult(result); +#endif + } + +#else //NET35 + private void Run(Action action) { action(); } @@ -193,7 +249,7 @@ public void Add(ISocketPollable socket) throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); - Run(() => + RunAsync(() => { // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread if (socket.IsDisposed) @@ -225,7 +281,7 @@ public void Add([NotNull] NetMQTimer timer) throw new ArgumentNullException(nameof(timer)); CheckDisposed(); - Run(() => m_timers.Add(timer)); + RunAsync(() => m_timers.Add(timer)); } /// @@ -243,7 +299,7 @@ public void Add([NotNull] Socket socket, [NotNull] Action callback) throw new ArgumentNullException(nameof(callback)); CheckDisposed(); - Run(() => + RunAsync(() => { if (m_pollinSockets.ContainsKey(socket)) return; @@ -259,15 +315,30 @@ public void Add([NotNull] Socket socket, [NotNull] Action callback) /// If socket is null /// If socket is already disposed /// If socket is getting disposed during the operation + [Obsolete("Queues the action on the poller's thread, but provides no sync mechanism. Please use RemoveAsync() instead")] public void Remove(ISocketPollable socket) + { + // keeps the current public API intact, though flawed (no way to know when the task actually removes the socket). + RemoveAsync(socket); + } + + /// + /// Remove a socket from the poller + /// + /// The socket to be removed + /// If socket is null + /// If socket is already disposed + /// If socket is getting disposed during the operation + public Task RemoveAsync(ISocketPollable socket) { if (socket == null) throw new ArgumentNullException(nameof(socket)); + if (socket.IsDisposed) throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); - Run(() => + return RunAsync(() => { // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread if (socket.IsDisposed) @@ -291,7 +362,21 @@ public void Remove(ISocketPollable socket) /// If socket is null /// If socket is disposed /// If socket got disposed during the operation + [Obsolete("Queues the action on the poller's thread, but provides no sync mechanism. Please use RemoveAndDisposeAsync() instead")] public void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable + { + // this implementation maintains *current* (flawed) behavior, since the task is not awaited + RemoveAndDisposeAsync(socket); + } + + /// + /// Remove the socket from the poller and dispose the socket + /// + /// The socket to be removed + /// If socket is null + /// If socket is disposed + /// If socket got disposed during the operation + public Task RemoveAndDisposeAsync(T socket) where T : ISocketPollable, IDisposable { if (socket == null) throw new ArgumentNullException(nameof(socket)); @@ -299,7 +384,7 @@ public void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); - Run(() => + return RunAsync(() => { // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread if (socket.IsDisposed) @@ -322,7 +407,18 @@ public void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable /// /// The timer to remove /// If poller is null + [Obsolete("Queues an action on the poller's thread to remove the timer, but provides no sync mechanism. Please use RemoveAsync() instead")] public void Remove([NotNull] NetMQTimer timer) + { + RemoveAsync(timer); + } + + /// + /// Remove a timer from the poller + /// + /// The timer to remove + /// If poller is null + public Task RemoveAsync(NetMQTimer timer) { if (timer == null) throw new ArgumentNullException(nameof(timer)); @@ -330,7 +426,7 @@ public void Remove([NotNull] NetMQTimer timer) timer.When = -1; - Run(() => m_timers.Remove(timer)); + return RunAsync(() => m_timers.Remove(timer)); } /// @@ -338,13 +434,24 @@ public void Remove([NotNull] NetMQTimer timer) /// /// The socket to remove /// If socket is null + [Obsolete("Queues an action on the poller's thread, but provides no sync mechanism. Please use RemoveAsync() instead")] public void Remove([NotNull] Socket socket) + { + RemoveAsync(socket); + } + + /// + /// Remove the .Net socket from the poller + /// + /// The socket to remove + /// If socket is null + public Task RemoveAsync([NotNull] Socket socket) { if (socket == null) throw new ArgumentNullException(nameof(socket)); CheckDisposed(); - Run(() => + return RunAsync(() => { m_pollinSockets.Remove(socket); m_isPollSetDirty = true; @@ -368,9 +475,7 @@ public Task ContainsAsync([NotNull] ISocketPollable socket) throw new ArgumentNullException(nameof(socket)); CheckDisposed(); - var tcs = new TaskCompletionSource(); - Run(() => tcs.SetResult(m_sockets.Contains(socket))); - return tcs.Task; + return RunAsync(new Func(() => m_sockets.Contains(socket))); } /// @@ -384,9 +489,7 @@ public Task ContainsAsync([NotNull] NetMQTimer timer) throw new ArgumentNullException(nameof(timer)); CheckDisposed(); - var tcs = new TaskCompletionSource(); - Run(() => tcs.SetResult(m_timers.Contains(timer))); - return tcs.Task; + return RunAsync(new Func(() => m_timers.Contains(timer))); } /// @@ -402,7 +505,7 @@ public Task ContainsAsync([NotNull] Socket socket) CheckDisposed(); var tcs = new TaskCompletionSource(); - Run(() => tcs.SetResult(m_pollinSockets.ContainsKey(socket))); + RunAsync(() => tcs.SetResult(m_pollinSockets.ContainsKey(socket))); return tcs.Task; } #endif @@ -764,7 +867,11 @@ public void Dispose() m_sockets.Remove(((ISocketPollable)m_tasksQueue).Socket); m_tasksQueue.Dispose(); #endif - + // JASells: this is running prematurely in test NetMWPollerTests.Monitoring + // causing a objectDisposed exception in NetMQSelector.Select ~line 146 + // Fixed by adding socket.IsDisposed check in NetMQSelector.Select @line 144. + // This check will also avoid servicing disposed sockets generally... + // and so we can remove the disposed checks on sockets in Remove, RemoveAsync, RemoveAndDispose, RemvoeAndDisposeASync foreach (var socket in m_sockets) { if (socket.IsDisposed) diff --git a/src/NetMQ/NetMQSelector.cs b/src/NetMQ/NetMQSelector.cs index e23802cc8..861453ec1 100644 --- a/src/NetMQ/NetMQSelector.cs +++ b/src/NetMQ/NetMQSelector.cs @@ -166,7 +166,7 @@ public bool Select([NotNull] Item[] items, int itemsCount, long timeout) selectItem.ResultEvent = PollEvents.None; - if (selectItem.Socket != null) + if (selectItem.Socket != null && !selectItem.Socket.IsDisposed) { var events = (PollEvents)selectItem.Socket.GetSocketOption(ZmqSocketOption.Events);