|
@@ -1,4 +1,5 @@
|
|
|
-using System.Collections.Concurrent;
|
|
|
|
|
|
|
+using System;
|
|
|
|
|
+using System.Collections.Concurrent;
|
|
|
using System.Collections.Generic;
|
|
using System.Collections.Generic;
|
|
|
using System.Linq;
|
|
using System.Linq;
|
|
|
using System.Threading;
|
|
using System.Threading;
|
|
@@ -332,6 +333,41 @@ namespace Abc.Zebus.Tests.Core
|
|
|
_messageDispatcherMock.Verify(x => x.AddInvoker(It.Is<IMessageHandlerInvoker>(invoker => invoker.MessageType == typeof(FakeCommand))), Times.Once);
|
|
_messageDispatcherMock.Verify(x => x.AddInvoker(It.Is<IMessageHandlerInvoker>(invoker => invoker.MessageType == typeof(FakeCommand))), Times.Once);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ [Test]
|
|
|
|
|
+ public async Task should_wait_for_unsubscribe_to_complete_before_adding_new_subscriptions()
|
|
|
|
|
+ {
|
|
|
|
|
+ _bus.Start();
|
|
|
|
|
+
|
|
|
|
|
+ _directoryMock.Setup(i => i.UpdateSubscriptionsAsync(It.IsAny<IBus>(), It.IsAny<IEnumerable<SubscriptionsForType>>()))
|
|
|
|
|
+ .Returns(Task.CompletedTask);
|
|
|
|
|
+
|
|
|
|
|
+ var subA = await _bus.SubscribeAsync(Subscription.Any<FakeCommand>(), msg => { }).ConfigureAwait(true);
|
|
|
|
|
+
|
|
|
|
|
+ var unsubscribeTcs = new TaskCompletionSource<object>();
|
|
|
|
|
+ var unsubscribeSent = false;
|
|
|
|
|
+ _directoryMock.Setup(i => i.UpdateSubscriptionsAsync(It.IsAny<IBus>(), It.IsAny<IEnumerable<SubscriptionsForType>>()))
|
|
|
|
|
+ .Callback(() => unsubscribeSent = true)
|
|
|
|
|
+ .Returns(unsubscribeTcs.Task);
|
|
|
|
|
+
|
|
|
|
|
+ subA.Dispose();
|
|
|
|
|
+ Wait.Until(() => unsubscribeSent, 2.Seconds());
|
|
|
|
|
+
|
|
|
|
|
+ var newSubscribeSent = false;
|
|
|
|
|
+ _directoryMock.Setup(i => i.UpdateSubscriptionsAsync(It.IsAny<IBus>(), It.IsAny<IEnumerable<SubscriptionsForType>>()))
|
|
|
|
|
+ .Callback(() => newSubscribeSent = true)
|
|
|
|
|
+ .Returns(Task.CompletedTask);
|
|
|
|
|
+
|
|
|
|
|
+ var subBTask = _bus.SubscribeAsync(Subscription.Any<FakeCommand>(), msg => { });
|
|
|
|
|
+
|
|
|
|
|
+ Thread.Sleep(200);
|
|
|
|
|
+ newSubscribeSent.ShouldBeFalse();
|
|
|
|
|
+
|
|
|
|
|
+ unsubscribeTcs.SetResult(null);
|
|
|
|
|
+ Wait.Until(() => newSubscribeSent, 2.Seconds());
|
|
|
|
|
+
|
|
|
|
|
+ await subBTask.ConfigureAwait(true);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
[Test]
|
|
[Test]
|
|
|
[Ignore("The implementation is non trivial and will be dealt with later")]
|
|
[Ignore("The implementation is non trivial and will be dealt with later")]
|
|
|
public void subscriptions_sent_to_the_directory_should_always_be_more_recent_than_the_previous()
|
|
public void subscriptions_sent_to_the_directory_should_always_be_more_recent_than_the_previous()
|