EnumerableEx.Buffering.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. namespace System.Linq
  8. {
  9. public static partial class EnumerableEx
  10. {
  11. /// <summary>
  12. /// Creates a buffer with a shared view over the source sequence, causing each enumerator to fetch the next element from the source sequence.
  13. /// </summary>
  14. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  15. /// <param name="source">Source sequence.</param>
  16. /// <returns>Buffer enabling each enumerator to retrieve elements from the shared source sequence.</returns>
  17. /// <example>
  18. /// var rng = Enumerable.Range(0, 10).Share();
  19. ///
  20. /// var e1 = rng.GetEnumerator(); // Both e1 and e2 will consume elements from
  21. /// var e2 = rng.GetEnumerator(); // the source sequence.
  22. ///
  23. /// Assert.IsTrue(e1.MoveNext());
  24. /// Assert.AreEqual(0, e1.Current);
  25. ///
  26. /// Assert.IsTrue(e1.MoveNext());
  27. /// Assert.AreEqual(1, e1.Current);
  28. ///
  29. /// Assert.IsTrue(e2.MoveNext()); // e2 "steals" element 2
  30. /// Assert.AreEqual(2, e2.Current);
  31. ///
  32. /// Assert.IsTrue(e1.MoveNext()); // e1 can't see element 2
  33. /// Assert.AreEqual(3, e1.Current);
  34. /// </example>
  35. public static IBuffer<TSource> Share<TSource>(this IEnumerable<TSource> source)
  36. {
  37. if (source == null)
  38. throw new ArgumentNullException(nameof(source));
  39. return new SharedBuffer<TSource>(source.GetEnumerator());
  40. }
  41. /// <summary>
  42. /// Shares the source sequence within a selector function where each enumerator can fetch the next element from the source sequence.
  43. /// </summary>
  44. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  45. /// <typeparam name="TResult">Result sequence element type.</typeparam>
  46. /// <param name="source">Source sequence.</param>
  47. /// <param name="selector">Selector function with shared access to the source sequence for each enumerator.</param>
  48. /// <returns>Sequence resulting from applying the selector function to the shared view over the source sequence.</returns>
  49. public static IEnumerable<TResult> Share<TSource, TResult>(this IEnumerable<TSource> source, Func<IEnumerable<TSource>, IEnumerable<TResult>> selector)
  50. {
  51. if (source == null)
  52. throw new ArgumentNullException(nameof(source));
  53. if (selector == null)
  54. throw new ArgumentNullException(nameof(selector));
  55. return Create<TResult>(() => selector(source.Share()).GetEnumerator());
  56. }
  57. class SharedBuffer<T> : IBuffer<T>
  58. {
  59. private IEnumerator<T> _source;
  60. private bool _disposed;
  61. public SharedBuffer(IEnumerator<T> source)
  62. {
  63. _source = source;
  64. }
  65. public IEnumerator<T> GetEnumerator()
  66. {
  67. if (_disposed)
  68. throw new ObjectDisposedException("");
  69. return GetEnumerator_();
  70. }
  71. private IEnumerator<T> GetEnumerator_()
  72. {
  73. while (true)
  74. {
  75. if (_disposed)
  76. throw new ObjectDisposedException("");
  77. var hasValue = default(bool);
  78. var current = default(T);
  79. lock (_source)
  80. {
  81. hasValue = _source.MoveNext();
  82. if (hasValue)
  83. current = _source.Current;
  84. }
  85. if (hasValue)
  86. yield return current;
  87. else
  88. break;
  89. }
  90. }
  91. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  92. {
  93. if (_disposed)
  94. throw new ObjectDisposedException("");
  95. return GetEnumerator();
  96. }
  97. public void Dispose()
  98. {
  99. lock (_source)
  100. {
  101. if (!_disposed)
  102. {
  103. _source.Dispose();
  104. _source = null;
  105. }
  106. _disposed = true;
  107. }
  108. }
  109. }
  110. /// <summary>
  111. /// Creates a buffer with a view over the source sequence, causing each enumerator to obtain access to the remainder of the sequence from the current index in the buffer.
  112. /// </summary>
  113. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  114. /// <param name="source">Source sequence.</param>
  115. /// <returns>Buffer enabling each enumerator to retrieve elements from the shared source sequence, starting from the index at the point of obtaining the enumerator.</returns>
  116. /// <example>
  117. /// var rng = Enumerable.Range(0, 10).Publish();
  118. ///
  119. /// var e1 = rng.GetEnumerator(); // e1 has a view on the source starting from element 0
  120. ///
  121. /// Assert.IsTrue(e1.MoveNext());
  122. /// Assert.AreEqual(0, e1.Current);
  123. ///
  124. /// Assert.IsTrue(e1.MoveNext());
  125. /// Assert.AreEqual(1, e1.Current);
  126. ///
  127. /// var e2 = rng.GetEnumerator();
  128. ///
  129. /// Assert.IsTrue(e2.MoveNext()); // e2 has a view on the source starting from element 2
  130. /// Assert.AreEqual(2, e2.Current);
  131. ///
  132. /// Assert.IsTrue(e1.MoveNext()); // e1 continues to enumerate over its view
  133. /// Assert.AreEqual(2, e1.Current);
  134. /// </example>
  135. public static IBuffer<TSource> Publish<TSource>(this IEnumerable<TSource> source)
  136. {
  137. if (source == null)
  138. throw new ArgumentNullException(nameof(source));
  139. return new PublishedBuffer<TSource>(source.GetEnumerator());
  140. }
  141. /// <summary>
  142. /// Publishes the source sequence within a selector function where each enumerator can obtain a view over a tail of the source sequence.
  143. /// </summary>
  144. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  145. /// <typeparam name="TResult">Result sequence element type.</typeparam>
  146. /// <param name="source">Source sequence.</param>
  147. /// <param name="selector">Selector function with published access to the source sequence for each enumerator.</param>
  148. /// <returns>Sequence resulting from applying the selector function to the published view over the source sequence.</returns>
  149. public static IEnumerable<TResult> Publish<TSource, TResult>(this IEnumerable<TSource> source, Func<IEnumerable<TSource>, IEnumerable<TResult>> selector)
  150. {
  151. if (source == null)
  152. throw new ArgumentNullException(nameof(source));
  153. if (selector == null)
  154. throw new ArgumentNullException(nameof(selector));
  155. return Create<TResult>(() => selector(source.Publish()).GetEnumerator());
  156. }
  157. class PublishedBuffer<T> : IBuffer<T>
  158. {
  159. private IEnumerator<T> _source;
  160. private RefCountList<T> _buffer;
  161. private bool _stopped;
  162. private Exception _error;
  163. private bool _disposed;
  164. public PublishedBuffer(IEnumerator<T> source)
  165. {
  166. _buffer = new RefCountList<T>(0);
  167. _source = source;
  168. }
  169. public IEnumerator<T> GetEnumerator()
  170. {
  171. if (_disposed)
  172. throw new ObjectDisposedException("");
  173. var i = default(int);
  174. lock (_source)
  175. {
  176. i = _buffer.Count;
  177. _buffer.ReaderCount++;
  178. }
  179. return GetEnumerator_(i);
  180. }
  181. private IEnumerator<T> GetEnumerator_(int i)
  182. {
  183. try
  184. {
  185. while (true)
  186. {
  187. if (_disposed)
  188. throw new ObjectDisposedException("");
  189. var hasValue = default(bool);
  190. var current = default(T);
  191. lock (_source)
  192. {
  193. if (i >= _buffer.Count)
  194. {
  195. if (!_stopped)
  196. {
  197. try
  198. {
  199. hasValue = _source.MoveNext();
  200. if (hasValue)
  201. current = _source.Current;
  202. }
  203. catch (Exception ex)
  204. {
  205. _stopped = true;
  206. _error = ex;
  207. _source.Dispose();
  208. }
  209. }
  210. if (_stopped)
  211. {
  212. if (_error != null)
  213. throw _error;
  214. else
  215. break;
  216. }
  217. if (hasValue)
  218. {
  219. _buffer.Add(current);
  220. }
  221. }
  222. else
  223. {
  224. hasValue = true;
  225. }
  226. }
  227. if (hasValue)
  228. yield return _buffer[i];
  229. else
  230. break;
  231. i++;
  232. }
  233. }
  234. finally
  235. {
  236. if (_buffer != null)
  237. _buffer.Done(i + 1);
  238. }
  239. }
  240. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  241. {
  242. if (_disposed)
  243. throw new ObjectDisposedException("");
  244. return GetEnumerator();
  245. }
  246. public void Dispose()
  247. {
  248. lock (_source)
  249. {
  250. if (!_disposed)
  251. {
  252. _source.Dispose();
  253. _source = null;
  254. _buffer.Clear();
  255. _buffer = null;
  256. }
  257. _disposed = true;
  258. }
  259. }
  260. }
  261. /// <summary>
  262. /// Creates a buffer with a view over the source sequence, causing each enumerator to obtain access to all of the sequence's elements without causing multiple enumerations over the source.
  263. /// </summary>
  264. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  265. /// <param name="source">Source sequence.</param>
  266. /// <returns>Buffer enabling each enumerator to retrieve all elements from the shared source sequence, without duplicating source enumeration side-effects.</returns>
  267. /// <example>
  268. /// var rng = Enumerable.Range(0, 10).Do(x => Console.WriteLine(x)).Memoize();
  269. ///
  270. /// var e1 = rng.GetEnumerator();
  271. ///
  272. /// Assert.IsTrue(e1.MoveNext()); // Prints 0
  273. /// Assert.AreEqual(0, e1.Current);
  274. ///
  275. /// Assert.IsTrue(e1.MoveNext()); // Prints 1
  276. /// Assert.AreEqual(1, e1.Current);
  277. ///
  278. /// var e2 = rng.GetEnumerator();
  279. ///
  280. /// Assert.IsTrue(e2.MoveNext()); // Doesn't print anything; the side-effect of Do
  281. /// Assert.AreEqual(0, e2.Current); // has already taken place during e1's iteration.
  282. ///
  283. /// Assert.IsTrue(e1.MoveNext()); // Prints 2
  284. /// Assert.AreEqual(2, e1.Current);
  285. /// </example>
  286. public static IBuffer<TSource> Memoize<TSource>(this IEnumerable<TSource> source)
  287. {
  288. if (source == null)
  289. throw new ArgumentNullException(nameof(source));
  290. return new MemoizedBuffer<TSource>(source.GetEnumerator());
  291. }
  292. /// <summary>
  293. /// Memoizes the source sequence within a selector function where each enumerator can get access to all of the sequence's elements without causing multiple enumerations over the source.
  294. /// </summary>
  295. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  296. /// <typeparam name="TResult">Result sequence element type.</typeparam>
  297. /// <param name="source">Source sequence.</param>
  298. /// <param name="selector">Selector function with memoized access to the source sequence for each enumerator.</param>
  299. /// <returns>Sequence resulting from applying the selector function to the memoized view over the source sequence.</returns>
  300. public static IEnumerable<TResult> Memoize<TSource, TResult>(this IEnumerable<TSource> source, Func<IEnumerable<TSource>, IEnumerable<TResult>> selector)
  301. {
  302. if (source == null)
  303. throw new ArgumentNullException(nameof(source));
  304. if (selector == null)
  305. throw new ArgumentNullException(nameof(selector));
  306. return Create<TResult>(() => selector(source.Memoize()).GetEnumerator());
  307. }
  308. /// <summary>
  309. /// Creates a buffer with a view over the source sequence, causing a specified number of enumerators to obtain access to all of the sequence's elements without causing multiple enumerations over the source.
  310. /// </summary>
  311. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  312. /// <param name="source">Source sequence.</param>
  313. /// <param name="readerCount">Number of enumerators that can access the underlying buffer. Once every enumerator has obtained an element from the buffer, the element is removed from the buffer.</param>
  314. /// <returns>Buffer enabling a specified number of enumerators to retrieve all elements from the shared source sequence, without duplicating source enumeration side-effects.</returns>
  315. public static IBuffer<TSource> Memoize<TSource>(this IEnumerable<TSource> source, int readerCount)
  316. {
  317. if (source == null)
  318. throw new ArgumentNullException(nameof(source));
  319. if (readerCount <= 0)
  320. throw new ArgumentOutOfRangeException(nameof(readerCount));
  321. return new MemoizedBuffer<TSource>(source.GetEnumerator(), readerCount);
  322. }
  323. /// <summary>
  324. /// Memoizes the source sequence within a selector function where a specified number of enumerators can get access to all of the sequence's elements without causing multiple enumerations over the source.
  325. /// </summary>
  326. /// <typeparam name="TSource">Source sequence element type.</typeparam>
  327. /// <typeparam name="TResult">Result sequence element type.</typeparam>
  328. /// <param name="source">Source sequence.</param>
  329. /// <param name="readerCount">Number of enumerators that can access the underlying buffer. Once every enumerator has obtained an element from the buffer, the element is removed from the buffer.</param>
  330. /// <param name="selector">Selector function with memoized access to the source sequence for a specified number of enumerators.</param>
  331. /// <returns>Sequence resulting from applying the selector function to the memoized view over the source sequence.</returns>
  332. public static IEnumerable<TResult> Memoize<TSource, TResult>(this IEnumerable<TSource> source, int readerCount, Func<IEnumerable<TSource>, IEnumerable<TResult>> selector)
  333. {
  334. if (source == null)
  335. throw new ArgumentNullException(nameof(source));
  336. if (readerCount <= 0)
  337. throw new ArgumentOutOfRangeException(nameof(readerCount));
  338. if (selector == null)
  339. throw new ArgumentNullException(nameof(selector));
  340. return Create<TResult>(() => selector(source.Memoize(readerCount)).GetEnumerator());
  341. }
  342. class MemoizedBuffer<T> : IBuffer<T>
  343. {
  344. private IEnumerator<T> _source;
  345. private IRefCountList<T> _buffer;
  346. private bool _stopped;
  347. private Exception _error;
  348. private bool _disposed;
  349. public MemoizedBuffer(IEnumerator<T> source)
  350. : this(source, new MaxRefCountList<T>())
  351. {
  352. }
  353. public MemoizedBuffer(IEnumerator<T> source, int readerCount)
  354. : this(source, new RefCountList<T>(readerCount))
  355. {
  356. }
  357. private MemoizedBuffer(IEnumerator<T> source, IRefCountList<T> buffer)
  358. {
  359. _source = source;
  360. _buffer = buffer;
  361. }
  362. public IEnumerator<T> GetEnumerator()
  363. {
  364. if (_disposed)
  365. throw new ObjectDisposedException("");
  366. return GetEnumerator_();
  367. }
  368. private IEnumerator<T> GetEnumerator_()
  369. {
  370. var i = 0;
  371. try
  372. {
  373. while (true)
  374. {
  375. if (_disposed)
  376. throw new ObjectDisposedException("");
  377. var hasValue = default(bool);
  378. var current = default(T);
  379. lock (_source)
  380. {
  381. if (i >= _buffer.Count)
  382. {
  383. if (!_stopped)
  384. {
  385. try
  386. {
  387. hasValue = _source.MoveNext();
  388. if (hasValue)
  389. current = _source.Current;
  390. }
  391. catch (Exception ex)
  392. {
  393. _stopped = true;
  394. _error = ex;
  395. _source.Dispose();
  396. }
  397. }
  398. if (_stopped)
  399. {
  400. if (_error != null)
  401. throw _error;
  402. else
  403. break;
  404. }
  405. if (hasValue)
  406. {
  407. _buffer.Add(current);
  408. }
  409. }
  410. else
  411. {
  412. hasValue = true;
  413. }
  414. }
  415. if (hasValue)
  416. yield return _buffer[i];
  417. else
  418. break;
  419. i++;
  420. }
  421. }
  422. finally
  423. {
  424. if (_buffer != null)
  425. _buffer.Done(i + 1);
  426. }
  427. }
  428. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  429. {
  430. if (_disposed)
  431. throw new ObjectDisposedException("");
  432. return GetEnumerator();
  433. }
  434. public void Dispose()
  435. {
  436. lock (_source)
  437. {
  438. if (!_disposed)
  439. {
  440. _source.Dispose();
  441. _source = null;
  442. _buffer.Clear();
  443. _buffer = null;
  444. }
  445. _disposed = true;
  446. }
  447. }
  448. }
  449. }
  450. /// <summary>
  451. /// Represents a buffer exposing a shared view over an underlying enumerable sequence.
  452. /// </summary>
  453. /// <typeparam name="T">Element type.</typeparam>
  454. public interface IBuffer<
  455. #if !NO_VARIANCE && !SILVERLIGHT4 // SL4 has defined IEnumerable with invariant T
  456. out
  457. #endif
  458. T> : IEnumerable<T>, IDisposable
  459. {
  460. }
  461. interface IRefCountList<T>
  462. {
  463. void Clear();
  464. int Count { get; }
  465. T this[int i]
  466. {
  467. get;
  468. }
  469. void Add(T item);
  470. void Done(int index);
  471. }
  472. class MaxRefCountList<T> : IRefCountList<T>
  473. {
  474. private IList<T> _list = new List<T>();
  475. public void Clear()
  476. {
  477. _list.Clear();
  478. }
  479. public int Count
  480. {
  481. get { return _list.Count; }
  482. }
  483. public T this[int i]
  484. {
  485. get { return _list[i]; }
  486. }
  487. public void Add(T item)
  488. {
  489. _list.Add(item);
  490. }
  491. public void Done(int index)
  492. {
  493. }
  494. }
  495. class RefCountList<T> : IRefCountList<T>
  496. {
  497. private int _readerCount;
  498. private readonly IDictionary<int, RefCount> _list;
  499. private int _count;
  500. public RefCountList(int readerCount)
  501. {
  502. _readerCount = readerCount;
  503. _list = new Dictionary<int, RefCount>();
  504. }
  505. public int ReaderCount
  506. {
  507. get
  508. {
  509. return _readerCount;
  510. }
  511. set
  512. {
  513. _readerCount = value;
  514. }
  515. }
  516. public void Clear()
  517. {
  518. _list.Clear();
  519. }
  520. public int Count
  521. {
  522. get { return _count; }
  523. }
  524. public T this[int i]
  525. {
  526. get
  527. {
  528. Debug.Assert(i < _count);
  529. var res = default(RefCount);
  530. if (!_list.TryGetValue(i, out res))
  531. throw new InvalidOperationException("Element no longer available in the buffer.");
  532. var val = res.Value;
  533. if (--res.Count == 0)
  534. _list.Remove(i);
  535. return val;
  536. }
  537. }
  538. public void Add(T item)
  539. {
  540. _list[_count] = new RefCount { Value = item, Count = _readerCount };
  541. _count++;
  542. }
  543. public void Done(int index)
  544. {
  545. for (int i = index; i < _count; i++)
  546. {
  547. var ignore = this[i];
  548. }
  549. _readerCount--;
  550. }
  551. class RefCount
  552. {
  553. public int Count;
  554. public T Value;
  555. }
  556. }
  557. }