ListObservable.cs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. namespace System.Reactive
  10. {
  11. // CONSIDER: Deprecate this functionality or invest in an asynchronous variant.
  12. /// <summary>
  13. /// Represents an object that retains the elements of the observable sequence and signals the end of the sequence.
  14. /// </summary>
  15. /// <typeparam name="T">The type of elements received from the source sequence.</typeparam>
  16. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix", Justification = "By design; Observable suffix takes precedence.")]
  17. [Experimental]
  18. public class ListObservable<T> : IList<T>, IObservable<object>
  19. {
  20. private readonly IDisposable subscription;
  21. private readonly AsyncSubject<object> subject = new AsyncSubject<object>();
  22. private readonly List<T> results = new List<T>();
  23. /// <summary>
  24. /// Constructs an object that retains the values of source and signals the end of the sequence.
  25. /// </summary>
  26. /// <param name="source">The observable sequence whose elements will be retained in the list.</param>
  27. /// <exception cref="ArgumentNullException"><paramref name="source"/> is <c>null</c>.</exception>
  28. public ListObservable(IObservable<T> source)
  29. {
  30. if (source == null)
  31. throw new ArgumentNullException(nameof(source));
  32. subscription = source.Subscribe(results.Add, subject.OnError, subject.OnCompleted);
  33. }
  34. private void Wait()
  35. {
  36. subject.DefaultIfEmpty().Wait();
  37. }
  38. /// <summary>
  39. /// Returns the last value of the observable sequence.
  40. /// </summary>
  41. public T Value
  42. {
  43. get
  44. {
  45. Wait();
  46. if (results.Count == 0)
  47. {
  48. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  49. }
  50. return results[results.Count - 1];
  51. }
  52. }
  53. /// <summary>
  54. /// Determines the index of a specific item in the ListObservable.
  55. /// </summary>
  56. /// <param name="item">The element to determine the index for.</param>
  57. /// <returns>The index of the specified item in the list; -1 if not found.</returns>
  58. public int IndexOf(T item)
  59. {
  60. Wait();
  61. return results.IndexOf(item);
  62. }
  63. /// <summary>
  64. /// Inserts an item to the ListObservable at the specified index.
  65. /// </summary>
  66. /// <param name="index">The index to insert the item at.</param>
  67. /// <param name="item">The item to insert in the list.</param>
  68. public void Insert(int index, T item)
  69. {
  70. Wait();
  71. results.Insert(index, item);
  72. }
  73. /// <summary>
  74. /// Removes the ListObservable item at the specified index.
  75. /// </summary>
  76. /// <param name="index">The index of the item to remove.</param>
  77. public void RemoveAt(int index)
  78. {
  79. Wait();
  80. results.RemoveAt(index);
  81. }
  82. /// <summary>
  83. /// Gets or sets the element at the specified index.
  84. /// </summary>
  85. /// <param name="index">The index of the item to retrieve or set.</param>
  86. public T this[int index]
  87. {
  88. get
  89. {
  90. Wait();
  91. return results[index];
  92. }
  93. set
  94. {
  95. Wait();
  96. results[index] = value;
  97. }
  98. }
  99. /// <summary>
  100. /// Adds an item to the ListObservable.
  101. /// </summary>
  102. /// <param name="item">The item to add to the list.</param>
  103. public void Add(T item)
  104. {
  105. Wait();
  106. results.Add(item);
  107. }
  108. /// <summary>
  109. /// Removes all items from the ListObservable.
  110. /// </summary>
  111. public void Clear()
  112. {
  113. Wait();
  114. results.Clear();
  115. }
  116. /// <summary>
  117. /// Determines whether the ListObservable contains a specific value.
  118. /// </summary>
  119. /// <param name="item">The item to search for in the list.</param>
  120. /// <returns>true if found; false otherwise.</returns>
  121. public bool Contains(T item)
  122. {
  123. Wait();
  124. return results.Contains(item);
  125. }
  126. /// <summary>
  127. /// Copies the elements of the ListObservable to an System.Array, starting at a particular System.Array index.
  128. /// </summary>
  129. /// <param name="array">The array to copy elements to.</param>
  130. /// <param name="arrayIndex">The start index in the array to start copying elements to.</param>
  131. public void CopyTo(T[] array, int arrayIndex)
  132. {
  133. Wait();
  134. results.CopyTo(array, arrayIndex);
  135. }
  136. /// <summary>
  137. /// Gets the number of elements contained in the ListObservable.
  138. /// </summary>
  139. public int Count
  140. {
  141. get
  142. {
  143. Wait();
  144. return results.Count;
  145. }
  146. }
  147. /// <summary>
  148. /// Gets a value that indicates whether the ListObservable is read-only.
  149. /// </summary>
  150. public bool IsReadOnly
  151. {
  152. get { return false; }
  153. }
  154. /// <summary>
  155. /// Removes the first occurrence of a specific object from the ListObservable.
  156. /// </summary>
  157. /// <param name="item">The item to remove from the list.</param>
  158. /// <returns>true if the item was found; false otherwise.</returns>
  159. public bool Remove(T item)
  160. {
  161. Wait();
  162. return results.Remove(item);
  163. }
  164. /// <summary>
  165. /// Returns an enumerator that iterates through the collection.
  166. /// </summary>
  167. /// <returns>Enumerator over the list.</returns>
  168. public IEnumerator<T> GetEnumerator()
  169. {
  170. Wait();
  171. return results.GetEnumerator();
  172. }
  173. IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
  174. /// <summary>
  175. /// Subscribes an observer to the ListObservable which will be notified upon completion.
  176. /// </summary>
  177. /// <param name="observer">The observer to send completion or error messages to.</param>
  178. /// <returns>The disposable resource that can be used to unsubscribe.</returns>
  179. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is <c>null</c>.</exception>
  180. public IDisposable Subscribe(IObserver<object> observer)
  181. {
  182. if (observer == null)
  183. throw new ArgumentNullException(nameof(observer));
  184. return StableCompositeDisposable.Create(subscription, subject.Subscribe(observer));
  185. }
  186. }
  187. }