SequenceEqual.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Linq.ObservableImpl
  8. {
  9. internal static class SequenceEqual<TSource>
  10. {
  11. internal sealed class Observable : Producer<bool, Observable._>
  12. {
  13. private readonly IObservable<TSource> _first;
  14. private readonly IObservable<TSource> _second;
  15. private readonly IEqualityComparer<TSource> _comparer;
  16. public Observable(IObservable<TSource> first, IObservable<TSource> second, IEqualityComparer<TSource> comparer)
  17. {
  18. _first = first;
  19. _second = second;
  20. _comparer = comparer;
  21. }
  22. protected override _ CreateSink(IObserver<bool> observer) => new _(_comparer, observer);
  23. protected override void Run(_ sink) => sink.Run(this);
  24. internal sealed class _ : IdentitySink<bool>
  25. {
  26. private readonly IEqualityComparer<TSource> _comparer;
  27. private readonly object _gate;
  28. private readonly Queue<TSource> _ql;
  29. private readonly Queue<TSource> _qr;
  30. public _(IEqualityComparer<TSource> comparer, IObserver<bool> observer)
  31. : base(observer)
  32. {
  33. _comparer = comparer;
  34. _gate = new object();
  35. _ql = new Queue<TSource>();
  36. _qr = new Queue<TSource>();
  37. }
  38. private bool _donel;
  39. private bool _doner;
  40. private IDisposable _second;
  41. public void Run(Observable parent)
  42. {
  43. SetUpstream(parent._first.SubscribeSafe(new FirstObserver(this)));
  44. Disposable.SetSingle(ref _second, parent._second.SubscribeSafe(new SecondObserver(this)));
  45. }
  46. protected override void Dispose(bool disposing)
  47. {
  48. if (disposing)
  49. {
  50. Disposable.TryDispose(ref _second);
  51. }
  52. base.Dispose(disposing);
  53. }
  54. private sealed class FirstObserver : IObserver<TSource>
  55. {
  56. private readonly _ _parent;
  57. public FirstObserver(_ parent)
  58. {
  59. _parent = parent;
  60. }
  61. public void OnNext(TSource value)
  62. {
  63. lock (_parent._gate)
  64. {
  65. if (_parent._qr.Count > 0)
  66. {
  67. var equal = false;
  68. var v = _parent._qr.Dequeue();
  69. try
  70. {
  71. equal = _parent._comparer.Equals(value, v);
  72. }
  73. catch (Exception exception)
  74. {
  75. _parent.ForwardOnError(exception);
  76. return;
  77. }
  78. if (!equal)
  79. {
  80. _parent.ForwardOnNext(false);
  81. _parent.ForwardOnCompleted();
  82. }
  83. }
  84. else if (_parent._doner)
  85. {
  86. _parent.ForwardOnNext(false);
  87. _parent.ForwardOnCompleted();
  88. }
  89. else
  90. _parent._ql.Enqueue(value);
  91. }
  92. }
  93. public void OnError(Exception error)
  94. {
  95. lock (_parent._gate)
  96. {
  97. _parent.ForwardOnError(error);
  98. }
  99. }
  100. public void OnCompleted()
  101. {
  102. lock (_parent._gate)
  103. {
  104. _parent._donel = true;
  105. if (_parent._ql.Count == 0)
  106. {
  107. if (_parent._qr.Count > 0)
  108. {
  109. _parent.ForwardOnNext(false);
  110. _parent.ForwardOnCompleted();
  111. }
  112. else if (_parent._doner)
  113. {
  114. _parent.ForwardOnNext(true);
  115. _parent.ForwardOnCompleted();
  116. }
  117. }
  118. }
  119. }
  120. }
  121. private sealed class SecondObserver : IObserver<TSource>
  122. {
  123. private readonly _ _parent;
  124. public SecondObserver(_ parent)
  125. {
  126. _parent = parent;
  127. }
  128. public void OnNext(TSource value)
  129. {
  130. lock (_parent._gate)
  131. {
  132. if (_parent._ql.Count > 0)
  133. {
  134. var equal = false;
  135. var v = _parent._ql.Dequeue();
  136. try
  137. {
  138. equal = _parent._comparer.Equals(v, value);
  139. }
  140. catch (Exception exception)
  141. {
  142. _parent.ForwardOnError(exception);
  143. return;
  144. }
  145. if (!equal)
  146. {
  147. _parent.ForwardOnNext(false);
  148. _parent.ForwardOnCompleted();
  149. }
  150. }
  151. else if (_parent._donel)
  152. {
  153. _parent.ForwardOnNext(false);
  154. _parent.ForwardOnCompleted();
  155. }
  156. else
  157. _parent._qr.Enqueue(value);
  158. }
  159. }
  160. public void OnError(Exception error)
  161. {
  162. lock (_parent._gate)
  163. {
  164. _parent.ForwardOnError(error);
  165. }
  166. }
  167. public void OnCompleted()
  168. {
  169. lock (_parent._gate)
  170. {
  171. _parent._doner = true;
  172. if (_parent._qr.Count == 0)
  173. {
  174. if (_parent._ql.Count > 0)
  175. {
  176. _parent.ForwardOnNext(false);
  177. _parent.ForwardOnCompleted();
  178. }
  179. else if (_parent._donel)
  180. {
  181. _parent.ForwardOnNext(true);
  182. _parent.ForwardOnCompleted();
  183. }
  184. }
  185. }
  186. }
  187. }
  188. }
  189. }
  190. internal sealed class Enumerable : Producer<bool, Enumerable._>
  191. {
  192. private readonly IObservable<TSource> _first;
  193. private readonly IEnumerable<TSource> _second;
  194. private readonly IEqualityComparer<TSource> _comparer;
  195. public Enumerable(IObservable<TSource> first, IEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
  196. {
  197. _first = first;
  198. _second = second;
  199. _comparer = comparer;
  200. }
  201. protected override _ CreateSink(IObserver<bool> observer) => new _(_comparer, observer);
  202. protected override void Run(_ sink) => sink.Run(this);
  203. internal sealed class _ : Sink<TSource, bool>
  204. {
  205. private readonly IEqualityComparer<TSource> _comparer;
  206. public _(IEqualityComparer<TSource> comparer, IObserver<bool> observer)
  207. : base(observer)
  208. {
  209. _comparer = comparer;
  210. }
  211. private IEnumerator<TSource> _enumerator;
  212. private static readonly IEnumerator<TSource> DisposedEnumerator = MakeDisposedEnumerator();
  213. private static IEnumerator<TSource> MakeDisposedEnumerator()
  214. {
  215. yield break;
  216. }
  217. public void Run(Enumerable parent)
  218. {
  219. //
  220. // Notice the evaluation order of obtaining the enumerator and subscribing to the
  221. // observable sequence is reversed compared to the operator's signature. This is
  222. // required to make sure the enumerator is available as soon as the observer can
  223. // be called. Otherwise, we end up having a race for the initialization and use
  224. // of the _rightEnumerator field.
  225. //
  226. try
  227. {
  228. var enumerator = parent._second.GetEnumerator();
  229. if (Interlocked.CompareExchange(ref _enumerator, enumerator, null) != null)
  230. {
  231. enumerator.Dispose();
  232. return;
  233. }
  234. }
  235. catch (Exception exception)
  236. {
  237. ForwardOnError(exception);
  238. return;
  239. }
  240. SetUpstream(parent._first.SubscribeSafe(this));
  241. }
  242. protected override void Dispose(bool disposing)
  243. {
  244. if (disposing)
  245. {
  246. Interlocked.Exchange(ref _enumerator, DisposedEnumerator)?.Dispose();
  247. }
  248. base.Dispose(disposing);
  249. }
  250. public override void OnNext(TSource value)
  251. {
  252. var equal = false;
  253. try
  254. {
  255. if (_enumerator.MoveNext())
  256. {
  257. var current = _enumerator.Current;
  258. equal = _comparer.Equals(value, current);
  259. }
  260. }
  261. catch (Exception exception)
  262. {
  263. ForwardOnError(exception);
  264. return;
  265. }
  266. if (!equal)
  267. {
  268. ForwardOnNext(false);
  269. ForwardOnCompleted();
  270. }
  271. }
  272. public override void OnCompleted()
  273. {
  274. var hasNext = false;
  275. try
  276. {
  277. hasNext = _enumerator.MoveNext();
  278. }
  279. catch (Exception exception)
  280. {
  281. ForwardOnError(exception);
  282. return;
  283. }
  284. ForwardOnNext(!hasNext);
  285. ForwardOnCompleted();
  286. }
  287. }
  288. }
  289. }
  290. }