SingleSubscriberObservableBase.cs 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. using System;
  2. using Avalonia.Threading;
  3. namespace Avalonia.Reactive
  4. {
  5. public abstract class SingleSubscriberObservableBase<T> : IObservable<T>, IDisposable
  6. {
  7. private Exception _error;
  8. private IObserver<T> _observer;
  9. private bool _completed;
  10. public IDisposable Subscribe(IObserver<T> observer)
  11. {
  12. Contract.Requires<ArgumentNullException>(observer != null);
  13. Dispatcher.UIThread.VerifyAccess();
  14. if (_observer != null)
  15. {
  16. throw new InvalidOperationException("The observable can only be subscribed once.");
  17. }
  18. if (_error != null)
  19. {
  20. observer.OnError(_error);
  21. }
  22. else if (_completed)
  23. {
  24. observer.OnCompleted();
  25. }
  26. else
  27. {
  28. _observer = observer;
  29. Subscribed();
  30. }
  31. return this;
  32. }
  33. public virtual void Dispose()
  34. {
  35. Unsubscribed();
  36. _observer = null;
  37. }
  38. protected abstract void Unsubscribed();
  39. protected void PublishNext(T value)
  40. {
  41. _observer?.OnNext(value);
  42. }
  43. protected void PublishCompleted()
  44. {
  45. if (_observer != null)
  46. {
  47. _observer.OnCompleted();
  48. _completed = true;
  49. Unsubscribed();
  50. _observer = null;
  51. }
  52. }
  53. protected void PublishError(Exception error)
  54. {
  55. if (_observer != null)
  56. {
  57. _observer.OnError(error);
  58. _error = error;
  59. Unsubscribed();
  60. _observer = null;
  61. }
  62. }
  63. protected abstract void Subscribed();
  64. }
  65. }