// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace System.Linq { public static partial class EnumerableEx { /// /// Creates a buffer with a view over the source sequence, causing each enumerator to obtain access to the remainder of /// the sequence from the current index in the buffer. /// /// Source sequence element type. /// Source sequence. /// /// Buffer enabling each enumerator to retrieve elements from the shared source sequence, starting from the index /// at the point of obtaining the enumerator. /// /// /// var rng = Enumerable.Range(0, 10).Publish(); /// var e1 = rng.GetEnumerator(); // e1 has a view on the source starting from element 0 /// Assert.IsTrue(e1.MoveNext()); /// Assert.AreEqual(0, e1.Current); /// Assert.IsTrue(e1.MoveNext()); /// Assert.AreEqual(1, e1.Current); /// var e2 = rng.GetEnumerator(); /// Assert.IsTrue(e2.MoveNext()); // e2 has a view on the source starting from element 2 /// Assert.AreEqual(2, e2.Current); /// Assert.IsTrue(e1.MoveNext()); // e1 continues to enumerate over its view /// Assert.AreEqual(2, e1.Current); /// public static IBuffer Publish(this IEnumerable source) { if (source == null) throw new ArgumentNullException(nameof(source)); return new PublishedBuffer(source.GetEnumerator()); } /// /// Publishes the source sequence within a selector function where each enumerator can obtain a view over a tail of the /// source sequence. /// /// Source sequence element type. /// Result sequence element type. /// Source sequence. /// Selector function with published access to the source sequence for each enumerator. /// Sequence resulting from applying the selector function to the published view over the source sequence. public static IEnumerable Publish(this IEnumerable source, Func, IEnumerable> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(() => selector(source.Publish()) .GetEnumerator()); } private class PublishedBuffer : IBuffer { private RefCountList _buffer; private bool _disposed; private Exception _error; private IEnumerator _source; private bool _stopped; public PublishedBuffer(IEnumerator source) { _buffer = new RefCountList(0); _source = source; } public IEnumerator GetEnumerator() { if (_disposed) throw new ObjectDisposedException(""); var i = default(int); lock (_source) { i = _buffer.Count; _buffer.ReaderCount++; } return GetEnumerator_(i); } IEnumerator IEnumerable.GetEnumerator() { if (_disposed) throw new ObjectDisposedException(""); return GetEnumerator(); } public void Dispose() { lock (_source) { if (!_disposed) { _source.Dispose(); _source = null; _buffer.Clear(); _buffer = null; } _disposed = true; } } private IEnumerator GetEnumerator_(int i) { try { while (true) { if (_disposed) throw new ObjectDisposedException(""); var hasValue = default(bool); var current = default(T); lock (_source) { if (i >= _buffer.Count) { if (!_stopped) { try { hasValue = _source.MoveNext(); if (hasValue) current = _source.Current; } catch (Exception ex) { _stopped = true; _error = ex; _source.Dispose(); } } if (_stopped) { if (_error != null) throw _error; else break; } if (hasValue) { _buffer.Add(current); } } else { hasValue = true; } } if (hasValue) yield return _buffer[i]; else break; i++; } } finally { if (_buffer != null) _buffer.Done(i + 1); } } } } }