LightweightObservableBase.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Reactive;
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using Avalonia.Threading;
  7. namespace Avalonia.Reactive
  8. {
  9. /// <summary>
  10. /// Lightweight base class for observable implementations.
  11. /// </summary>
  12. /// <typeparam name="T">The observable type.</typeparam>
  13. /// <remarks>
  14. /// <see cref="ObservableBase{T}"/> is rather heavyweight in terms of allocations and memory
  15. /// usage. This class provides a more lightweight base for some internal observable types
  16. /// in the Avalonia framework.
  17. /// </remarks>
  18. public abstract class LightweightObservableBase<T> : IObservable<T>
  19. {
  20. private Exception _error;
  21. private List<IObserver<T>> _observers = new List<IObserver<T>>();
  22. public IDisposable Subscribe(IObserver<T> observer)
  23. {
  24. Contract.Requires<ArgumentNullException>(observer != null);
  25. Dispatcher.UIThread.VerifyAccess();
  26. var first = false;
  27. for (; ; )
  28. {
  29. if (Volatile.Read(ref _observers) == null)
  30. {
  31. if (_error != null)
  32. {
  33. observer.OnError(_error);
  34. }
  35. else
  36. {
  37. observer.OnCompleted();
  38. }
  39. return Disposable.Empty;
  40. }
  41. lock (this)
  42. {
  43. if (_observers == null)
  44. {
  45. continue;
  46. }
  47. first = _observers.Count == 0;
  48. _observers.Add(observer);
  49. break;
  50. }
  51. }
  52. if (first)
  53. {
  54. Initialize();
  55. }
  56. Subscribed(observer, first);
  57. return new RemoveObserver(this, observer);
  58. }
  59. void Remove(IObserver<T> observer)
  60. {
  61. if (Volatile.Read(ref _observers) != null)
  62. {
  63. lock (this)
  64. {
  65. var observers = _observers;
  66. if (observers != null)
  67. {
  68. observers.Remove(observer);
  69. if (observers.Count == 0)
  70. {
  71. observers.TrimExcess();
  72. Deinitialize();
  73. }
  74. }
  75. }
  76. }
  77. }
  78. sealed class RemoveObserver : IDisposable
  79. {
  80. LightweightObservableBase<T> _parent;
  81. IObserver<T> _observer;
  82. public RemoveObserver(LightweightObservableBase<T> parent, IObserver<T> observer)
  83. {
  84. _parent = parent;
  85. Volatile.Write(ref _observer, observer);
  86. }
  87. public void Dispose()
  88. {
  89. var observer = _observer;
  90. Interlocked.Exchange(ref _parent, null)?.Remove(observer);
  91. _observer = null;
  92. }
  93. }
  94. protected abstract void Initialize();
  95. protected abstract void Deinitialize();
  96. protected void PublishNext(T value)
  97. {
  98. if (Volatile.Read(ref _observers) != null)
  99. {
  100. IObserver<T>[] observers = null;
  101. IObserver<T> singleObserver = null;
  102. lock (this)
  103. {
  104. if (_observers == null)
  105. {
  106. return;
  107. }
  108. if (_observers.Count == 1)
  109. {
  110. singleObserver = _observers[0];
  111. }
  112. else
  113. {
  114. observers = _observers.ToArray();
  115. }
  116. }
  117. if (singleObserver != null)
  118. {
  119. singleObserver.OnNext(value);
  120. }
  121. else
  122. {
  123. foreach (var observer in observers)
  124. {
  125. observer.OnNext(value);
  126. }
  127. }
  128. }
  129. }
  130. protected void PublishCompleted()
  131. {
  132. if (Volatile.Read(ref _observers) != null)
  133. {
  134. IObserver<T>[] observers;
  135. lock (this)
  136. {
  137. if (_observers == null)
  138. {
  139. return;
  140. }
  141. observers = _observers.ToArray();
  142. Volatile.Write(ref _observers, null);
  143. }
  144. foreach (var observer in observers)
  145. {
  146. observer.OnCompleted();
  147. }
  148. Deinitialize();
  149. }
  150. }
  151. protected void PublishError(Exception error)
  152. {
  153. if (Volatile.Read(ref _observers) != null)
  154. {
  155. IObserver<T>[] observers;
  156. lock (this)
  157. {
  158. if (_observers == null)
  159. {
  160. return;
  161. }
  162. _error = error;
  163. observers = _observers.ToArray();
  164. Volatile.Write(ref _observers, null);
  165. }
  166. foreach (var observer in observers)
  167. {
  168. observer.OnError(error);
  169. }
  170. Deinitialize();
  171. }
  172. }
  173. protected virtual void Subscribed(IObserver<T> observer, bool first)
  174. {
  175. }
  176. }
  177. }