ColdObservable.cs 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive;
  7. using System.Reactive.Disposables;
  8. namespace Microsoft.Reactive.Testing
  9. {
  10. internal class ColdObservable<T> : ITestableObservable<T>
  11. {
  12. private readonly TestScheduler _scheduler;
  13. private readonly Recorded<Notification<T>>[] _messages;
  14. private readonly List<Subscription> _subscriptions = new List<Subscription>();
  15. public ColdObservable(TestScheduler scheduler, params Recorded<Notification<T>>[] messages)
  16. {
  17. _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler));
  18. _messages = messages ?? throw new ArgumentNullException(nameof(messages));
  19. }
  20. public virtual IDisposable Subscribe(IObserver<T> observer)
  21. {
  22. if (observer == null)
  23. {
  24. throw new ArgumentNullException(nameof(observer));
  25. }
  26. _subscriptions.Add(new Subscription(_scheduler.Clock));
  27. var index = _subscriptions.Count - 1;
  28. var d = new CompositeDisposable();
  29. for (var i = 0; i < _messages.Length; ++i)
  30. {
  31. var notification = _messages[i].Value;
  32. d.Add(_scheduler.ScheduleRelative(default(object), _messages[i].Time, (scheduler1, state1) => { notification.Accept(observer); return Disposable.Empty; }));
  33. }
  34. return Disposable.Create(() =>
  35. {
  36. _subscriptions[index] = new Subscription(_subscriptions[index].Subscribe, _scheduler.Clock);
  37. d.Dispose();
  38. });
  39. }
  40. public IList<Subscription> Subscriptions
  41. {
  42. get { return _subscriptions; }
  43. }
  44. public IList<Recorded<Notification<T>>> Messages
  45. {
  46. get { return _messages; }
  47. }
  48. }
  49. }