HotObservable.cs 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive;
  7. using System.Reactive.Disposables;
  8. namespace Microsoft.Reactive.Testing
  9. {
  10. internal class HotObservable<T> : ITestableObservable<T>
  11. {
  12. private readonly TestScheduler _scheduler;
  13. private readonly List<IObserver<T>> _observers = new List<IObserver<T>>();
  14. private readonly List<Subscription> _subscriptions = new List<Subscription>();
  15. private readonly Recorded<Notification<T>>[] _messages;
  16. public HotObservable(TestScheduler scheduler, params Recorded<Notification<T>>[] messages)
  17. {
  18. _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler));
  19. _messages = messages ?? throw new ArgumentNullException(nameof(messages));
  20. for (var i = 0; i < messages.Length; ++i)
  21. {
  22. var notification = messages[i].Value;
  23. scheduler.ScheduleAbsolute(default(object), messages[i].Time, (scheduler1, state1) =>
  24. {
  25. var _observers = this._observers.ToArray();
  26. for (var j = 0; j < _observers.Length; ++j)
  27. {
  28. notification.Accept(_observers[j]);
  29. }
  30. return Disposable.Empty;
  31. });
  32. }
  33. }
  34. public virtual IDisposable Subscribe(IObserver<T> observer)
  35. {
  36. if (observer == null)
  37. {
  38. throw new ArgumentNullException(nameof(observer));
  39. }
  40. _observers.Add(observer);
  41. _subscriptions.Add(new Subscription(_scheduler.Clock));
  42. var index = _subscriptions.Count - 1;
  43. return Disposable.Create(() =>
  44. {
  45. _observers.Remove(observer);
  46. _subscriptions[index] = new Subscription(_subscriptions[index].Subscribe, _scheduler.Clock);
  47. });
  48. }
  49. public IList<Subscription> Subscriptions
  50. {
  51. get { return _subscriptions; }
  52. }
  53. public IList<Recorded<Notification<T>>> Messages
  54. {
  55. get { return _messages; }
  56. }
  57. }
  58. }