瀏覽代碼

Merge pull request #229 from Reactive-Extensions/rename-create

Rename create methods
Oren Novotny 9 年之前
父節點
當前提交
7ec90b9651
共有 31 個文件被更改,包括 1532 次插入1494 次删除
  1. 15 12
      Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.cs
  2. 64 63
      Ix.NET/Source/System.Interactive.Async/Buffer.cs
  3. 90 88
      Ix.NET/Source/System.Interactive.Async/Catch.cs
  4. 88 86
      Ix.NET/Source/System.Interactive.Async/Concatenate.cs
  5. 49 57
      Ix.NET/Source/System.Interactive.Async/Create.cs
  6. 39 38
      Ix.NET/Source/System.Interactive.Async/DefaultIfEmpty.cs
  7. 3 2
      Ix.NET/Source/System.Interactive.Async/Defer.cs
  8. 50 49
      Ix.NET/Source/System.Interactive.Async/Distinct.cs
  9. 46 45
      Ix.NET/Source/System.Interactive.Async/Do.cs
  10. 33 32
      Ix.NET/Source/System.Interactive.Async/Except.cs
  11. 45 44
      Ix.NET/Source/System.Interactive.Async/Expand.cs
  12. 14 13
      Ix.NET/Source/System.Interactive.Async/Finally.cs
  13. 38 37
      Ix.NET/Source/System.Interactive.Async/Generate.cs
  14. 3 3
      Ix.NET/Source/System.Interactive.Async/Grouping.cs
  15. 24 23
      Ix.NET/Source/System.Interactive.Async/IgnoreElements.cs
  16. 37 36
      Ix.NET/Source/System.Interactive.Async/Intersect.cs
  17. 2 2
      Ix.NET/Source/System.Interactive.Async/Join.cs
  18. 47 46
      Ix.NET/Source/System.Interactive.Async/OnErrorResumeNext.cs
  19. 4 4
      Ix.NET/Source/System.Interactive.Async/OrderBy.cs
  20. 91 89
      Ix.NET/Source/System.Interactive.Async/Repeat.cs
  21. 30 28
      Ix.NET/Source/System.Interactive.Async/Reverse.cs
  22. 78 76
      Ix.NET/Source/System.Interactive.Async/Scan.cs
  23. 47 45
      Ix.NET/Source/System.Interactive.Async/Select.cs
  24. 90 88
      Ix.NET/Source/System.Interactive.Async/SelectMany.cs
  25. 145 141
      Ix.NET/Source/System.Interactive.Async/Skip.cs
  26. 129 125
      Ix.NET/Source/System.Interactive.Async/Take.cs
  27. 54 51
      Ix.NET/Source/System.Interactive.Async/ToAsyncEnumerable.cs
  28. 57 56
      Ix.NET/Source/System.Interactive.Async/ToObservable.cs
  29. 45 44
      Ix.NET/Source/System.Interactive.Async/Using.cs
  30. 53 51
      Ix.NET/Source/System.Interactive.Async/Where.cs
  31. 22 20
      Ix.NET/Source/System.Interactive.Async/Zip.cs

+ 15 - 12
Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.cs

@@ -22,10 +22,11 @@ namespace System.Linq
 
         public static IAsyncEnumerable<TValue> Empty<TValue>()
         {
-            return Create(() => Create<TValue>(
-                              ct => TaskExt.False,
-                              () => { throw new InvalidOperationException(); },
-                              () => { })
+            return CreateEnumerable(
+                () => CreateEnumerator<TValue>(
+                    ct => TaskExt.False,
+                    () => { throw new InvalidOperationException(); },
+                    () => { })
             );
         }
 
@@ -47,10 +48,11 @@ namespace System.Linq
 
         public static IAsyncEnumerable<TValue> Never<TValue>()
         {
-            return Create(() => Create<TValue>(
-                              (ct, tcs) => tcs.Task,
-                              () => { throw new InvalidOperationException(); },
-                              () => { })
+            return CreateEnumerable(
+                () => CreateEnumerator<TValue>(
+                    (ct, tcs) => tcs.Task,
+                    () => { throw new InvalidOperationException(); },
+                    () => { })
             );
         }
 
@@ -65,10 +67,11 @@ namespace System.Linq
             if (exception == null)
                 throw new ArgumentNullException(nameof(exception));
 
-            return Create(() => Create<TValue>(
-                              ct => TaskExt.Throw<bool>(exception),
-                              () => { throw new InvalidOperationException(); },
-                              () => { })
+            return CreateEnumerable(
+                () => CreateEnumerator<TValue>(
+                    ct => TaskExt.Throw<bool>(exception),
+                    () => { throw new InvalidOperationException(); },
+                    () => { })
             );
         }
 

+ 64 - 63
Ix.NET/Source/System.Interactive.Async/Buffer.cs

@@ -1,6 +1,6 @@
-// // Licensed to the .NET Foundation under one or more agreements.
-// // The .NET Foundation licenses this file to you under the Apache 2.0 License.
-// // See the LICENSE file in the project root for more information. 
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
 
 using System;
 using System.Collections.Generic;
@@ -36,66 +36,67 @@ namespace System.Linq
 
         private static IAsyncEnumerable<IList<TSource>> Buffer_<TSource>(this IAsyncEnumerable<TSource> source, int count, int skip)
         {
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var buffers = new Queue<IList<TSource>>();
-
-                              var i = 0;
-
-                              var current = default(IList<TSource>);
-                              var stopped = false;
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (!stopped)
-                                      {
-                                          if (await e.MoveNext(ct)
-                                                     .ConfigureAwait(false))
-                                          {
-                                              var item = e.Current;
-
-                                              if (i++%skip == 0)
-                                                  buffers.Enqueue(new List<TSource>(count));
-
-                                              foreach (var buffer in buffers)
-                                                  buffer.Add(item);
-
-                                              if (buffers.Count > 0 && buffers.Peek()
-                                                                              .Count == count)
-                                              {
-                                                  current = buffers.Dequeue();
-                                                  return true;
-                                              }
-                                              return await f(ct)
-                                                         .ConfigureAwait(false);
-                                          }
-                                          stopped = true;
-                                          e.Dispose();
-
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-                                      if (buffers.Count > 0)
-                                      {
-                                          current = buffers.Dequeue();
-                                          return true;
-                                      }
-                                      return false;
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var buffers = new Queue<IList<TSource>>();
+
+                    var i = 0;
+
+                    var current = default(IList<TSource>);
+                    var stopped = false;
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (!stopped)
+                            {
+                                if (await e.MoveNext(ct)
+                                           .ConfigureAwait(false))
+                                {
+                                    var item = e.Current;
+
+                                    if (i++%skip == 0)
+                                        buffers.Enqueue(new List<TSource>(count));
+
+                                    foreach (var buffer in buffers)
+                                        buffer.Add(item);
+
+                                    if (buffers.Count > 0 && buffers.Peek()
+                                                                    .Count == count)
+                                    {
+                                        current = buffers.Dequeue();
+                                        return true;
+                                    }
+                                    return await f(ct)
+                                               .ConfigureAwait(false);
+                                }
+                                stopped = true;
+                                e.Dispose();
+
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                            if (buffers.Count > 0)
+                            {
+                                current = buffers.Dequeue();
+                                return true;
+                            }
+                            return false;
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 90 - 88
Ix.NET/Source/System.Interactive.Async/Catch.cs

@@ -21,50 +21,51 @@ namespace System.Linq
             if (handler == null)
                 throw new ArgumentNullException(nameof(handler));
 
-            return Create(() =>
-            {
-                var e = source.GetEnumerator();
-
-                var cts = new CancellationTokenDisposable();
-                var a = new AssignableDisposable
+            return CreateEnumerable(
+                () =>
                 {
-                    Disposable = e
-                };
-                var d = Disposable.Create(cts, a);
-                var done = false;
+                    var e = source.GetEnumerator();
 
-                var f = default(Func<CancellationToken, Task<bool>>);
-                f = async ct =>
-                {
-                    if (!done)
+                    var cts = new CancellationTokenDisposable();
+                    var a = new AssignableDisposable
                     {
-                        try
+                        Disposable = e
+                    };
+                    var d = Disposable.Create(cts, a);
+                    var done = false;
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
                         {
+                            if (!done)
+                            {
+                                try
+                                {
+                                    return await e.MoveNext(ct)
+                                                  .ConfigureAwait(false);
+                                }
+                                catch (TException ex)
+                                {
+                                    var err = handler(ex)
+                                        .GetEnumerator();
+                                    e = err;
+                                    a.Disposable = e;
+                                    done = true;
+                                    return await f(ct)
+                                               .ConfigureAwait(false);
+                                }
+                            }
                             return await e.MoveNext(ct)
                                           .ConfigureAwait(false);
-                        }
-                        catch (TException ex)
-                        {
-                            var err = handler(ex)
-                                .GetEnumerator();
-                            e = err;
-                            a.Disposable = e;
-                            done = true;
-                            return await f(ct)
-                                       .ConfigureAwait(false);
-                        }
-                    }
-                    return await e.MoveNext(ct)
-                                  .ConfigureAwait(false);
-                };
-
-                return Create(
-                    f,
-                    () => e.Current,
-                    d.Dispose,
-                    a
-                );
-            });
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        a
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> Catch<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
@@ -90,64 +91,65 @@ namespace System.Linq
             if (second == null)
                 throw new ArgumentNullException(nameof(second));
 
-            return new[] { first, second }.Catch_();
+            return new[] {first, second}.Catch_();
         }
 
         private static IAsyncEnumerable<TSource> Catch_<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
         {
-            return Create(() =>
-            {
-                var se = sources.GetEnumerator();
-                var e = default(IAsyncEnumerator<TSource>);
-
-                var cts = new CancellationTokenDisposable();
-                var a = new AssignableDisposable();
-                var d = Disposable.Create(cts, se, a);
-
-                var error = default(ExceptionDispatchInfo);
-
-                var f = default(Func<CancellationToken, Task<bool>>);
-                f = async ct =>
+            return CreateEnumerable(
+                () =>
                 {
-                    if (e == null)
-                    {
-                        if (se.MoveNext())
-                        {
-                            e = se.Current.GetEnumerator();
-                        }
-                        else
-                        {
-                            error?.Throw();
-                            return false;
-                        }
+                    var se = sources.GetEnumerator();
+                    var e = default(IAsyncEnumerator<TSource>);
 
-                        error = null;
+                    var cts = new CancellationTokenDisposable();
+                    var a = new AssignableDisposable();
+                    var d = Disposable.Create(cts, se, a);
 
-                        a.Disposable = e;
-                    }
+                    var error = default(ExceptionDispatchInfo);
 
-                    try
-                    {
-                        return await e.MoveNext(ct)
-                                      .ConfigureAwait(false);
-                    }
-                    catch (Exception exception)
-                    {
-                        e.Dispose();
-                        e = null;
-                        error = ExceptionDispatchInfo.Capture(exception);
-                        return await f(ct)
-                                   .ConfigureAwait(false);
-                    }
-                };
-
-                return Create(
-                    f,
-                    () => e.Current,
-                    d.Dispose,
-                    a
-                );
-            });
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (e == null)
+                            {
+                                if (se.MoveNext())
+                                {
+                                    e = se.Current.GetEnumerator();
+                                }
+                                else
+                                {
+                                    error?.Throw();
+                                    return false;
+                                }
+
+                                error = null;
+
+                                a.Disposable = e;
+                            }
+
+                            try
+                            {
+                                return await e.MoveNext(ct)
+                                              .ConfigureAwait(false);
+                            }
+                            catch (Exception exception)
+                            {
+                                e.Dispose();
+                                e = null;
+                                error = ExceptionDispatchInfo.Capture(exception);
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        a
+                    );
+                });
         }
     }
 }

+ 88 - 86
Ix.NET/Source/System.Interactive.Async/Concatenate.cs

@@ -19,46 +19,47 @@ namespace System.Linq
             if (second == null)
                 throw new ArgumentNullException(nameof(second));
 
-            return Create(() =>
-                          {
-                              var switched = false;
-                              var e = first.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var a = new AssignableDisposable
-                              {
-                                  Disposable = e
-                              };
-                              var d = Disposable.Create(cts, a);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          return true;
-                                      }
-                                      if (switched)
-                                      {
-                                          return false;
-                                      }
-                                      switched = true;
-
-                                      e = second.GetEnumerator();
-                                      a.Disposable = e;
-
-                                      return await f(ct)
-                                                 .ConfigureAwait(false);
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var switched = false;
+                    var e = first.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var a = new AssignableDisposable
+                    {
+                        Disposable = e
+                    };
+                    var d = Disposable.Create(cts, a);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                return true;
+                            }
+                            if (switched)
+                            {
+                                return false;
+                            }
+                            switched = true;
+
+                            e = second.GetEnumerator();
+                            a.Disposable = e;
+
+                            return await f(ct)
+                                       .ConfigureAwait(false);
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> Concat<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
@@ -79,52 +80,53 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> Concat_<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
         {
-            return Create(() =>
-                          {
-                              var se = sources.GetEnumerator();
-                              var e = default(IAsyncEnumerator<TSource>);
-
-                              var cts = new CancellationTokenDisposable();
-                              var a = new AssignableDisposable();
-                              var d = Disposable.Create(cts, se, a);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (e == null)
-                                      {
-                                          var b = false;
-                                          b = se.MoveNext();
-                                          if (b)
-                                              e = se.Current.GetEnumerator();
-
-                                          if (!b)
-                                          {
-                                              return false;
-                                          }
-
-                                          a.Disposable = e;
-                                      }
-
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          return true;
-                                      }
-                                      e.Dispose();
-                                      e = null;
-
-                                      return await f(ct)
-                                                 .ConfigureAwait(false);
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => e.Current,
-                                  d.Dispose,
-                                  a
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var se = sources.GetEnumerator();
+                    var e = default(IAsyncEnumerator<TSource>);
+
+                    var cts = new CancellationTokenDisposable();
+                    var a = new AssignableDisposable();
+                    var d = Disposable.Create(cts, se, a);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (e == null)
+                            {
+                                var b = false;
+                                b = se.MoveNext();
+                                if (b)
+                                    e = se.Current.GetEnumerator();
+
+                                if (!b)
+                                {
+                                    return false;
+                                }
+
+                                a.Disposable = e;
+                            }
+
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                return true;
+                            }
+                            e.Dispose();
+                            e = null;
+
+                            return await f(ct)
+                                       .ConfigureAwait(false);
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        a
+                    );
+                });
         }
     }
 }

+ 49 - 57
Ix.NET/Source/System.Interactive.Async/Create.cs

@@ -1,6 +1,7 @@
 // Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
+
 using System;
 using System.Collections.Generic;
 using System.Linq;
@@ -11,57 +12,44 @@ namespace System.Linq
 {
     public static partial class AsyncEnumerable
     {
-        public static IAsyncEnumerable<T> Create<T>(Func<IAsyncEnumerator<T>> getEnumerator)
+        public static IAsyncEnumerable<T> CreateEnumerable<T>(Func<IAsyncEnumerator<T>> getEnumerator)
         {
             return new AnonymousAsyncEnumerable<T>(getEnumerator);
         }
 
-        private class AnonymousAsyncEnumerable<T> : IAsyncEnumerable<T>
+        public static IAsyncEnumerator<T> CreateEnumerator<T>(Func<CancellationToken, Task<bool>> moveNext, Func<T> current, Action dispose)
         {
-            private Func<IAsyncEnumerator<T>> getEnumerator;
-
-            public AnonymousAsyncEnumerable(Func<IAsyncEnumerator<T>> getEnumerator)
-            {
-                this.getEnumerator = getEnumerator;
-            }
-
-            public IAsyncEnumerator<T> GetEnumerator()
-            {
-                return getEnumerator();
-            }
+            return new AnonymousAsyncEnumerator<T>(moveNext, current, dispose);
         }
 
-        private static IAsyncEnumerator<T> Create<T>(Func<CancellationToken, Task<bool>> moveNext, Func<T> current,
-            Action dispose, IDisposable enumerator)
+        private static IAsyncEnumerator<T> CreateEnumerator<T>(Func<CancellationToken, Task<bool>> moveNext, Func<T> current,
+                                                               Action dispose, IDisposable enumerator)
         {
-            return Create(async ct =>
-            {
-                using (ct.Register(dispose))
+            return CreateEnumerator(
+                async ct =>
                 {
-                    try
+                    using (ct.Register(dispose))
                     {
-                        var result = await moveNext(ct).ConfigureAwait(false);
-                        if (!result)
+                        try
+                        {
+                            var result = await moveNext(ct)
+                                             .ConfigureAwait(false);
+                            if (!result)
+                            {
+                                enumerator?.Dispose();
+                            }
+                            return result;
+                        }
+                        catch
                         {
                             enumerator?.Dispose();
+                            throw;
                         }
-                        return result;
                     }
-                    catch
-                    {
-                        enumerator?.Dispose();
-                        throw;
-                    }
-                }
-            }, current, dispose);
+                }, current, dispose);
         }
 
-        public static IAsyncEnumerator<T> Create<T>(Func<CancellationToken, Task<bool>> moveNext, Func<T> current, Action dispose)
-        {
-            return new AnonymousAsyncEnumerator<T>(moveNext, current, dispose);
-        }
-
-        private static IAsyncEnumerator<T> Create<T>(Func<CancellationToken, TaskCompletionSource<bool>, Task<bool>> moveNext, Func<T> current, Action dispose)
+        private static IAsyncEnumerator<T> CreateEnumerator<T>(Func<CancellationToken, TaskCompletionSource<bool>, Task<bool>> moveNext, Func<T> current, Action dispose)
         {
             var self = default(IAsyncEnumerator<T>);
             self = new AnonymousAsyncEnumerator<T>(
@@ -69,15 +57,17 @@ namespace System.Linq
                 {
                     var tcs = new TaskCompletionSource<bool>();
 
-                    var stop = new Action(() =>
-                    {
-                        self.Dispose();
-                        tcs.TrySetCanceled();
-                    });
+                    var stop = new Action(
+                        () =>
+                        {
+                            self.Dispose();
+                            tcs.TrySetCanceled();
+                        });
 
                     using (ct.Register(stop))
                     {
-                        return await moveNext(ct, tcs).ConfigureAwait(false);
+                        return await moveNext(ct, tcs)
+                                   .ConfigureAwait(false);
                     }
                 },
                 current,
@@ -86,11 +76,26 @@ namespace System.Linq
             return self;
         }
 
+        private class AnonymousAsyncEnumerable<T> : IAsyncEnumerable<T>
+        {
+            private readonly Func<IAsyncEnumerator<T>> getEnumerator;
+
+            public AnonymousAsyncEnumerable(Func<IAsyncEnumerator<T>> getEnumerator)
+            {
+                this.getEnumerator = getEnumerator;
+            }
+
+            public IAsyncEnumerator<T> GetEnumerator()
+            {
+                return getEnumerator();
+            }
+        }
+
         private class AnonymousAsyncEnumerator<T> : IAsyncEnumerator<T>
         {
-            private readonly Func<CancellationToken, Task<bool>> _moveNext;
             private readonly Func<T> _current;
             private readonly Action _dispose;
+            private readonly Func<CancellationToken, Task<bool>> _moveNext;
             private bool _disposed;
 
             public AnonymousAsyncEnumerator(Func<CancellationToken, Task<bool>> moveNext, Func<T> current, Action dispose)
@@ -108,13 +113,7 @@ namespace System.Linq
                 return _moveNext(cancellationToken);
             }
 
-            public T Current
-            {
-                get
-                {
-                    return _current();
-                }
-            }
+            public T Current => _current();
 
             public void Dispose()
             {
@@ -125,12 +124,5 @@ namespace System.Linq
                 }
             }
         }
-
-
-
-
-      
-
-
     }
-}
+}

+ 39 - 38
Ix.NET/Source/System.Interactive.Async/DefaultIfEmpty.cs

@@ -1,6 +1,6 @@
-// // Licensed to the .NET Foundation under one or more agreements.
-// // The .NET Foundation licenses this file to you under the Apache 2.0 License.
-// // See the LICENSE file in the project root for more information. 
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
 
 using System;
 using System.Collections.Generic;
@@ -17,44 +17,45 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return Create(() =>
-                          {
-                              var done = false;
-                              var hasElements = false;
-                              var e = source.GetEnumerator();
-                              var current = default(TSource);
+            return CreateEnumerable(
+                () =>
+                {
+                    var done = false;
+                    var hasElements = false;
+                    var e = source.GetEnumerator();
+                    var current = default(TSource);
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (done)
-                                          return false;
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          hasElements = true;
-                                          current = e.Current;
-                                          return true;
-                                      }
-                                      done = true;
-                                      if (!hasElements)
-                                      {
-                                          current = defaultValue;
-                                          return true;
-                                      }
-                                      return false;
-                                  };
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (done)
+                                return false;
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                hasElements = true;
+                                current = e.Current;
+                                return true;
+                            }
+                            done = true;
+                            if (!hasElements)
+                            {
+                                current = defaultValue;
+                                return true;
+                            }
+                            return false;
+                        };
 
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> DefaultIfEmpty<TSource>(this IAsyncEnumerable<TSource> source)

+ 3 - 2
Ix.NET/Source/System.Interactive.Async/Defer.cs

@@ -16,8 +16,9 @@ namespace System.Linq
             if (factory == null)
                 throw new ArgumentNullException(nameof(factory));
 
-            return Create(() => factory()
-                              .GetEnumerator());
+            return CreateEnumerable(
+                () => factory()
+                    .GetEnumerator());
         }
     }
 }

+ 50 - 49
Ix.NET/Source/System.Interactive.Async/Distinct.cs

@@ -102,55 +102,56 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> DistinctUntilChanged_<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
         {
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var currentKey = default(TKey);
-                              var hasCurrentKey = false;
-                              var current = default(TSource);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          var item = e.Current;
-                                          var key = default(TKey);
-                                          var comparerEquals = false;
-
-                                          key = keySelector(item);
-
-                                          if (hasCurrentKey)
-                                          {
-                                              comparerEquals = comparer.Equals(currentKey, key);
-                                          }
-
-                                          if (!hasCurrentKey || !comparerEquals)
-                                          {
-                                              hasCurrentKey = true;
-                                              currentKey = key;
-
-                                              current = item;
-                                              return true;
-                                          }
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-                                      return false;
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var currentKey = default(TKey);
+                    var hasCurrentKey = false;
+                    var current = default(TSource);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                var item = e.Current;
+                                var key = default(TKey);
+                                var comparerEquals = false;
+
+                                key = keySelector(item);
+
+                                if (hasCurrentKey)
+                                {
+                                    comparerEquals = comparer.Equals(currentKey, key);
+                                }
+
+                                if (!hasCurrentKey || !comparerEquals)
+                                {
+                                    hasCurrentKey = true;
+                                    currentKey = key;
+
+                                    current = item;
+                                    return true;
+                                }
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                            return false;
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 46 - 45
Ix.NET/Source/System.Interactive.Async/Do.cs

@@ -72,51 +72,52 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> DoHelper<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
         {
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var current = default(TSource);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      try
-                                      {
-                                          var result = await e.MoveNext(ct)
-                                                              .ConfigureAwait(false);
-                                          if (!result)
-                                          {
-                                              onCompleted();
-                                          }
-                                          else
-                                          {
-                                              current = e.Current;
-                                              onNext(current);
-                                          }
-                                          return result;
-                                      }
-                                      catch (OperationCanceledException)
-                                      {
-                                          throw;
-                                      }
-                                      catch (Exception ex)
-                                      {
-                                          onError(ex);
-                                          throw;
-                                      }
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var current = default(TSource);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            try
+                            {
+                                var result = await e.MoveNext(ct)
+                                                    .ConfigureAwait(false);
+                                if (!result)
+                                {
+                                    onCompleted();
+                                }
+                                else
+                                {
+                                    current = e.Current;
+                                    onNext(current);
+                                }
+                                return result;
+                            }
+                            catch (OperationCanceledException)
+                            {
+                                throw;
+                            }
+                            catch (Exception ex)
+                            {
+                                onError(ex);
+                                throw;
+                            }
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 33 - 32
Ix.NET/Source/System.Interactive.Async/Except.cs

@@ -1,6 +1,6 @@
-// // Licensed to the .NET Foundation under one or more agreements.
-// // The .NET Foundation licenses this file to you under the Apache 2.0 License.
-// // See the LICENSE file in the project root for more information. 
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
 
 using System;
 using System.Collections.Generic;
@@ -31,39 +31,40 @@ namespace System.Linq
             if (comparer == null)
                 throw new ArgumentNullException(nameof(comparer));
 
-            return Create(() =>
-                          {
-                              var e = first.GetEnumerator();
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = first.GetEnumerator();
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              var mapTask = default(Task<Dictionary<TSource, TSource>>);
-                              var getMapTask = new Func<CancellationToken, Task<Dictionary<TSource, TSource>>>(
-                                  ct => mapTask ?? (mapTask = second.ToDictionary(x => x, comparer, ct)));
+                    var mapTask = default(Task<Dictionary<TSource, TSource>>);
+                    var getMapTask = new Func<CancellationToken, Task<Dictionary<TSource, TSource>>>(
+                        ct => mapTask ?? (mapTask = second.ToDictionary(x => x, comparer, ct)));
 
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (await e.MoveNext(ct)
-                                                 .Zip(getMapTask(ct), (b, _) => b)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          if (!mapTask.Result.ContainsKey(e.Current))
-                                              return true;
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-                                      return false;
-                                  };
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (await e.MoveNext(ct)
+                                       .Zip(getMapTask(ct), (b, _) => b)
+                                       .ConfigureAwait(false))
+                            {
+                                if (!mapTask.Result.ContainsKey(e.Current))
+                                    return true;
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                            return false;
+                        };
 
-                              return Create(
-                                  f,
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 45 - 44
Ix.NET/Source/System.Interactive.Async/Expand.cs

@@ -19,58 +19,59 @@ namespace System.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            return Create(() =>
-                          {
-                              var e = default(IAsyncEnumerator<TSource>);
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = default(IAsyncEnumerator<TSource>);
 
-                              var cts = new CancellationTokenDisposable();
-                              var a = new AssignableDisposable();
-                              var d = Disposable.Create(cts, a);
+                    var cts = new CancellationTokenDisposable();
+                    var a = new AssignableDisposable();
+                    var d = Disposable.Create(cts, a);
 
-                              var queue = new Queue<IAsyncEnumerable<TSource>>();
-                              queue.Enqueue(source);
+                    var queue = new Queue<IAsyncEnumerable<TSource>>();
+                    queue.Enqueue(source);
 
-                              var current = default(TSource);
+                    var current = default(TSource);
 
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (e == null)
-                                      {
-                                          if (queue.Count > 0)
-                                          {
-                                              var src = queue.Dequeue();
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (e == null)
+                            {
+                                if (queue.Count > 0)
+                                {
+                                    var src = queue.Dequeue();
 
-                                              e = src.GetEnumerator();
+                                    e = src.GetEnumerator();
 
-                                              a.Disposable = e;
-                                              return await f(ct)
-                                                         .ConfigureAwait(false);
-                                          }
-                                          return false;
-                                      }
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          var item = e.Current;
-                                          var next = selector(item);
+                                    a.Disposable = e;
+                                    return await f(ct)
+                                               .ConfigureAwait(false);
+                                }
+                                return false;
+                            }
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                var item = e.Current;
+                                var next = selector(item);
 
-                                          queue.Enqueue(next);
-                                          current = item;
-                                          return true;
-                                      }
-                                      e = null;
-                                      return await f(ct)
-                                                 .ConfigureAwait(false);
-                                  };
+                                queue.Enqueue(next);
+                                current = item;
+                                return true;
+                            }
+                            e = null;
+                            return await f(ct)
+                                       .ConfigureAwait(false);
+                        };
 
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 14 - 13
Ix.NET/Source/System.Interactive.Async/Finally.cs

@@ -18,21 +18,22 @@ namespace System.Linq
             if (finallyAction == null)
                 throw new ArgumentNullException(nameof(finallyAction));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
 
-                              var cts = new CancellationTokenDisposable();
-                              var r = new Disposable(finallyAction);
-                              var d = Disposable.Create(cts, e, r);
+                    var cts = new CancellationTokenDisposable();
+                    var r = new Disposable(finallyAction);
+                    var d = Disposable.Create(cts, e, r);
 
-                              return Create(
-                                  ct => e.MoveNext(ct),
-                                  () => e.Current,
-                                  d.Dispose,
-                                  r
-                              );
-                          });
+                    return CreateEnumerator(
+                        ct => e.MoveNext(ct),
+                        () => e.Current,
+                        d.Dispose,
+                        r
+                    );
+                });
         }
     }
 }

+ 38 - 37
Ix.NET/Source/System.Interactive.Async/Generate.cs

@@ -20,43 +20,44 @@ namespace System.Linq
             if (resultSelector == null)
                 throw new ArgumentNullException(nameof(resultSelector));
 
-            return Create(() =>
-                          {
-                              var i = initialState;
-                              var started = false;
-                              var current = default(TResult);
-
-                              return Create(
-                                  ct =>
-                                  {
-                                      var b = false;
-                                      try
-                                      {
-                                          if (started)
-                                              i = iterate(i);
-
-                                          b = condition(i);
-
-                                          if (b)
-                                              current = resultSelector(i);
-                                      }
-                                      catch (Exception ex)
-                                      {
-                                          return TaskExt.Throw<bool>(ex);
-                                      }
-
-                                      if (!b)
-                                          return TaskExt.False;
-
-                                      if (!started)
-                                          started = true;
-
-                                      return TaskExt.True;
-                                  },
-                                  () => current,
-                                  () => { }
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var i = initialState;
+                    var started = false;
+                    var current = default(TResult);
+
+                    return CreateEnumerator(
+                        ct =>
+                        {
+                            var b = false;
+                            try
+                            {
+                                if (started)
+                                    i = iterate(i);
+
+                                b = condition(i);
+
+                                if (b)
+                                    current = resultSelector(i);
+                            }
+                            catch (Exception ex)
+                            {
+                                return TaskExt.Throw<bool>(ex);
+                            }
+
+                            if (!b)
+                                return TaskExt.False;
+
+                            if (!started)
+                                started = true;
+
+                            return TaskExt.True;
+                        },
+                        () => current,
+                        () => { }
+                    );
+                });
         }
     }
 }

+ 3 - 3
Ix.NET/Source/System.Interactive.Async/Grouping.cs

@@ -25,7 +25,7 @@ namespace System.Linq
             if (comparer == null)
                 throw new ArgumentNullException(nameof(comparer));
 
-            return Create(() =>
+            return CreateEnumerable(() =>
                           {
                               var gate = new object();
 
@@ -139,7 +139,7 @@ namespace System.Linq
                                                  .ConfigureAwait(false);
                                   };
 
-                              return Create(
+                              return CreateEnumerator(
                                   f,
                                   () => current,
                                   d.Dispose,
@@ -306,7 +306,7 @@ namespace System.Linq
                         return false;
                     };
 
-                return Create(
+                return CreateEnumerator(
                     ct =>
                     {
                         ++index;

+ 24 - 23
Ix.NET/Source/System.Interactive.Async/IgnoreElements.cs

@@ -17,33 +17,34 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (!await e.MoveNext(ct)
-                                                  .ConfigureAwait(false))
-                                      {
-                                          return false;
-                                      }
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (!await e.MoveNext(ct)
+                                        .ConfigureAwait(false))
+                            {
+                                return false;
+                            }
 
-                                      return await f(ct)
-                                                 .ConfigureAwait(false);
-                                  };
+                            return await f(ct)
+                                       .ConfigureAwait(false);
+                        };
 
-                              return Create<TSource>(
-                                  f,
-                                  () => { throw new InvalidOperationException(); },
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator<TSource>(
+                        f,
+                        () => { throw new InvalidOperationException(); },
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 37 - 36
Ix.NET/Source/System.Interactive.Async/Intersect.cs

@@ -21,46 +21,47 @@ namespace System.Linq
             if (comparer == null)
                 throw new ArgumentNullException(nameof(comparer));
 
-            return Create(() =>
-                          {
-                              var e = first.GetEnumerator();
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = first.GetEnumerator();
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              var mapTask = default(Task<Dictionary<TSource, TSource>>);
-                              var getMapTask = new Func<CancellationToken, Task<Dictionary<TSource, TSource>>>(
-                                  ct =>
-                                  {
-                                      if (mapTask == null)
-                                          mapTask = second.ToDictionary(x => x, comparer, ct);
-                                      return mapTask;
-                                  });
+                    var mapTask = default(Task<Dictionary<TSource, TSource>>);
+                    var getMapTask = new Func<CancellationToken, Task<Dictionary<TSource, TSource>>>(
+                        ct =>
+                        {
+                            if (mapTask == null)
+                                mapTask = second.ToDictionary(x => x, comparer, ct);
+                            return mapTask;
+                        });
 
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (await e.MoveNext(ct)
-                                                 .Zip(getMapTask(ct), (b, _) => b)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          // Note: Result here is safe because the task
-                                          // was completed in the Zip() call above
-                                          if (mapTask.Result.ContainsKey(e.Current))
-                                              return true;
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-                                      return false;
-                                  };
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (await e.MoveNext(ct)
+                                       .Zip(getMapTask(ct), (b, _) => b)
+                                       .ConfigureAwait(false))
+                            {
+                                // Note: Result here is safe because the task
+                                // was completed in the Zip() call above
+                                if (mapTask.Result.ContainsKey(e.Current))
+                                    return true;
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                            return false;
+                        };
 
-                              return Create(
-                                  f,
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
 

+ 2 - 2
Ix.NET/Source/System.Interactive.Async/Join.cs

@@ -27,7 +27,7 @@ namespace System.Linq
             if (comparer == null)
                 throw new ArgumentNullException(nameof(comparer));
 
-            return Create(
+            return CreateEnumerable(
                 () =>
                 {
                     var oe = outer.GetEnumerator();
@@ -147,7 +147,7 @@ namespace System.Linq
                                        .ConfigureAwait(false);
                         };
 
-                    return Create(
+                    return CreateEnumerator(
                         f,
                         () => current,
                         d.Dispose,

+ 47 - 46
Ix.NET/Source/System.Interactive.Async/OnErrorResumeNext.cs

@@ -40,58 +40,59 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> OnErrorResumeNext_<TSource>(IEnumerable<IAsyncEnumerable<TSource>> sources)
         {
-            return Create(() =>
-                          {
-                              var se = sources.GetEnumerator();
-                              var e = default(IAsyncEnumerator<TSource>);
+            return CreateEnumerable(
+                () =>
+                {
+                    var se = sources.GetEnumerator();
+                    var e = default(IAsyncEnumerator<TSource>);
 
-                              var cts = new CancellationTokenDisposable();
-                              var a = new AssignableDisposable();
-                              var d = Disposable.Create(cts, se, a);
+                    var cts = new CancellationTokenDisposable();
+                    var a = new AssignableDisposable();
+                    var d = Disposable.Create(cts, se, a);
 
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (e == null)
-                                      {
-                                          var b = false;
-                                          b = se.MoveNext();
-                                          if (b)
-                                              e = se.Current.GetEnumerator();
-                                          else
-                                          {
-                                              return false;
-                                          }
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (e == null)
+                            {
+                                var b = false;
+                                b = se.MoveNext();
+                                if (b)
+                                    e = se.Current.GetEnumerator();
+                                else
+                                {
+                                    return false;
+                                }
 
-                                          a.Disposable = e;
-                                      }
+                                a.Disposable = e;
+                            }
 
-                                      try
-                                      {
-                                          if (await e.MoveNext(ct)
-                                                     .ConfigureAwait(false))
-                                          {
-                                              return true;
-                                          }
-                                      }
-                                      catch
-                                      {
-                                          // ignore
-                                      }
+                            try
+                            {
+                                if (await e.MoveNext(ct)
+                                           .ConfigureAwait(false))
+                                {
+                                    return true;
+                                }
+                            }
+                            catch
+                            {
+                                // ignore
+                            }
 
-                                      e.Dispose();
-                                      e = null;
-                                      return await f(ct)
-                                                 .ConfigureAwait(false);
-                                  };
+                            e.Dispose();
+                            e = null;
+                            return await f(ct)
+                                       .ConfigureAwait(false);
+                        };
 
-                              return Create(
-                                  f,
-                                  () => e.Current,
-                                  d.Dispose,
-                                  a
-                              );
-                          });
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        a
+                    );
+                });
         }
     }
 }

+ 4 - 4
Ix.NET/Source/System.Interactive.Async/OrderBy.cs

@@ -22,11 +22,11 @@ namespace System.Linq
                 throw new ArgumentNullException(nameof(comparer));
 
             return new OrderedAsyncEnumerable<TSource, TKey>(
-                Create(() =>
+                CreateEnumerable(() =>
                        {
                            var current = default(IEnumerable<TSource>);
 
-                           return Create(
+                           return CreateEnumerator(
                                async ct =>
                                {
                                    if (current == null)
@@ -152,7 +152,7 @@ namespace System.Linq
 
             private IAsyncEnumerable<IEnumerable<T>> Classes()
             {
-                return Create(() =>
+                return CreateEnumerable(() =>
                               {
                                   var e = equivalenceClasses.GetEnumerator();
                                   var list = new List<IEnumerable<T>>();
@@ -182,7 +182,7 @@ namespace System.Linq
                                           return e1.MoveNext();
                                       };
 
-                                  return Create(
+                                  return CreateEnumerator(
                                       async ct =>
                                       {
                                           if (e1 != null)

+ 91 - 89
Ix.NET/Source/System.Interactive.Async/Repeat.cs

@@ -23,17 +23,17 @@ namespace System.Linq
 
         public static IAsyncEnumerable<TResult> Repeat<TResult>(TResult element)
         {
-            return Create(() =>
-                          {
-                              return Create(
-                                  ct => TaskExt.True,
-                                  () => element,
-                                  () => { }
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    return CreateEnumerator(
+                        ct => TaskExt.True,
+                        () => element,
+                        () => { }
+                    );
+                });
         }
 
-
         public static IAsyncEnumerable<TSource> Repeat<TSource>(this IAsyncEnumerable<TSource> source, int count)
         {
             if (source == null)
@@ -41,49 +41,50 @@ namespace System.Linq
             if (count < 0)
                 throw new ArgumentOutOfRangeException(nameof(count));
 
-            return Create(() =>
-                          {
-                              var e = default(IAsyncEnumerator<TSource>);
-                              var a = new AssignableDisposable();
-                              var n = count;
-                              var current = default(TSource);
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, a);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (e == null)
-                                      {
-                                          if (n-- == 0)
-                                          {
-                                              return false;
-                                          }
-
-                                          e = source.GetEnumerator();
-
-                                          a.Disposable = e;
-                                      }
-
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          current = e.Current;
-                                          return true;
-                                      }
-                                      e = null;
-                                      return await f(ct)
-                                                 .ConfigureAwait(false);
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = default(IAsyncEnumerator<TSource>);
+                    var a = new AssignableDisposable();
+                    var n = count;
+                    var current = default(TSource);
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, a);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (e == null)
+                            {
+                                if (n-- == 0)
+                                {
+                                    return false;
+                                }
+
+                                e = source.GetEnumerator();
+
+                                a.Disposable = e;
+                            }
+
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                current = e.Current;
+                                return true;
+                            }
+                            e = null;
+                            return await f(ct)
+                                       .ConfigureAwait(false);
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> Repeat<TSource>(this IAsyncEnumerable<TSource> source)
@@ -91,43 +92,44 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return Create(() =>
-                          {
-                              var e = default(IAsyncEnumerator<TSource>);
-                              var a = new AssignableDisposable();
-                              var current = default(TSource);
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, a);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (e == null)
-                                      {
-                                          e = source.GetEnumerator();
-
-                                          a.Disposable = e;
-                                      }
-
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          current = e.Current;
-                                          return true;
-                                      }
-                                      e = null;
-                                      return await f(ct)
-                                                 .ConfigureAwait(false);
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = default(IAsyncEnumerator<TSource>);
+                    var a = new AssignableDisposable();
+                    var current = default(TSource);
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, a);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (e == null)
+                            {
+                                e = source.GetEnumerator();
+
+                                a.Disposable = e;
+                            }
+
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                current = e.Current;
+                                return true;
+                            }
+                            e = null;
+                            return await f(ct)
+                                       .ConfigureAwait(false);
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 30 - 28
Ix.NET/Source/System.Interactive.Async/Reverse.cs

@@ -16,36 +16,38 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var stack = default(Stack<TSource>);
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var stack = default(Stack<TSource>);
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              return Create(
-                                  async ct =>
-                                  {
-                                      if (stack == null)
-                                      {
-                                          stack = await Create(() => e)
-                                                      .Aggregate(new Stack<TSource>(), (s, x) =>
-                                                                                       {
-                                                                                           s.Push(x);
-                                                                                           return s;
-                                                                                       }, cts.Token)
-                                                      .ConfigureAwait(false);
-                                          return stack.Count > 0;
-                                      }
-                                      stack.Pop();
-                                      return stack.Count > 0;
-                                  },
-                                  () => stack.Peek(),
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        async ct =>
+                        {
+                            if (stack == null)
+                            {
+                                stack = await CreateEnumerable(
+                                                () => e)
+                                            .Aggregate(new Stack<TSource>(), (s, x) =>
+                                                                             {
+                                                                                 s.Push(x);
+                                                                                 return s;
+                                                                             }, cts.Token)
+                                            .ConfigureAwait(false);
+                                return stack.Count > 0;
+                            }
+                            stack.Pop();
+                            return stack.Count > 0;
+                        },
+                        () => stack.Peek(),
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 78 - 76
Ix.NET/Source/System.Interactive.Async/Scan.cs

@@ -19,39 +19,40 @@ namespace System.Linq
             if (accumulator == null)
                 throw new ArgumentNullException(nameof(accumulator));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var acc = seed;
-                              var current = default(TAccumulate);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (!await e.MoveNext(ct)
-                                                  .ConfigureAwait(false))
-                                      {
-                                          return false;
-                                      }
-
-                                      var item = e.Current;
-                                      acc = accumulator(acc, item);
-
-                                      current = acc;
-                                      return true;
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var acc = seed;
+                    var current = default(TAccumulate);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (!await e.MoveNext(ct)
+                                        .ConfigureAwait(false))
+                            {
+                                return false;
+                            }
+
+                            var item = e.Current;
+                            acc = accumulator(acc, item);
+
+                            current = acc;
+                            return true;
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator)
@@ -61,49 +62,50 @@ namespace System.Linq
             if (accumulator == null)
                 throw new ArgumentNullException(nameof(accumulator));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var hasSeed = false;
-                              var acc = default(TSource);
-                              var current = default(TSource);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (!await e.MoveNext(ct)
-                                                  .ConfigureAwait(false))
-                                      {
-                                          return false;
-                                      }
-
-                                      var item = e.Current;
-
-                                      if (!hasSeed)
-                                      {
-                                          hasSeed = true;
-                                          acc = item;
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-
-                                      acc = accumulator(acc, item);
-
-                                      current = acc;
-                                      return true;
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var hasSeed = false;
+                    var acc = default(TSource);
+                    var current = default(TSource);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (!await e.MoveNext(ct)
+                                        .ConfigureAwait(false))
+                            {
+                                return false;
+                            }
+
+                            var item = e.Current;
+
+                            if (!hasSeed)
+                            {
+                                hasSeed = true;
+                                acc = item;
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+
+                            acc = accumulator(acc, item);
+
+                            current = acc;
+                            return true;
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 47 - 45
Ix.NET/Source/System.Interactive.Async/Select.cs

@@ -18,30 +18,31 @@ namespace System.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var current = default(TResult);
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var current = default(TResult);
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              return Create(
-                                  async ct =>
-                                  {
-                                      if (await e.MoveNext(cts.Token)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          current = selector(e.Current);
-                                          return true;
-                                      }
-                                      return false;
-                                  },
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        async ct =>
+                        {
+                            if (await e.MoveNext(cts.Token)
+                                       .ConfigureAwait(false))
+                            {
+                                current = selector(e.Current);
+                                return true;
+                            }
+                            return false;
+                        },
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, TResult> selector)
@@ -51,31 +52,32 @@ namespace System.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var current = default(TResult);
-                              var index = 0;
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var current = default(TResult);
+                    var index = 0;
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              return Create(
-                                  async ct =>
-                                  {
-                                      if (await e.MoveNext(cts.Token)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          current = selector(e.Current, checked(index++));
-                                          return true;
-                                      }
-                                      return false;
-                                  },
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        async ct =>
+                        {
+                            if (await e.MoveNext(cts.Token)
+                                       .ConfigureAwait(false))
+                            {
+                                current = selector(e.Current, checked(index++));
+                                return true;
+                            }
+                            return false;
+                        },
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 90 - 88
Ix.NET/Source/System.Interactive.Async/SelectMany.cs

@@ -30,52 +30,53 @@ namespace System.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var ie = default(IAsyncEnumerator<TResult>);
-
-                              var innerDisposable = new AssignableDisposable();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, innerDisposable, e);
-
-                              var inner = default(Func<CancellationToken, Task<bool>>);
-                              var outer = default(Func<CancellationToken, Task<bool>>);
-
-                              inner = async ct =>
-                                      {
-                                          if (await ie.MoveNext(ct)
-                                                      .ConfigureAwait(false))
-                                          {
-                                              return true;
-                                          }
-                                          innerDisposable.Disposable = null;
-                                          return await outer(ct)
-                                                     .ConfigureAwait(false);
-                                      };
-
-                              outer = async ct =>
-                                      {
-                                          if (await e.MoveNext(ct)
-                                                     .ConfigureAwait(false))
-                                          {
-                                              var enumerable = selector(e.Current);
-                                              ie = enumerable.GetEnumerator();
-                                              innerDisposable.Disposable = ie;
-
-                                              return await inner(ct)
-                                                         .ConfigureAwait(false);
-                                          }
-                                          return false;
-                                      };
-
-                              return Create(ct => ie == null ? outer(cts.Token) : inner(cts.Token),
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var ie = default(IAsyncEnumerator<TResult>);
+
+                    var innerDisposable = new AssignableDisposable();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, innerDisposable, e);
+
+                    var inner = default(Func<CancellationToken, Task<bool>>);
+                    var outer = default(Func<CancellationToken, Task<bool>>);
+
+                    inner = async ct =>
+                            {
+                                if (await ie.MoveNext(ct)
+                                            .ConfigureAwait(false))
+                                {
+                                    return true;
+                                }
+                                innerDisposable.Disposable = null;
+                                return await outer(ct)
+                                           .ConfigureAwait(false);
+                            };
+
+                    outer = async ct =>
+                            {
+                                if (await e.MoveNext(ct)
+                                           .ConfigureAwait(false))
+                                {
+                                    var enumerable = selector(e.Current);
+                                    ie = enumerable.GetEnumerator();
+                                    innerDisposable.Disposable = ie;
+
+                                    return await inner(ct)
+                                               .ConfigureAwait(false);
+                                }
+                                return false;
+                            };
+
+                    return CreateEnumerator(ct => ie == null ? outer(cts.Token) : inner(cts.Token),
                                             () => ie.Current,
                                             d.Dispose,
                                             e
-                              );
-                          });
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
@@ -85,54 +86,55 @@ namespace System.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var ie = default(IAsyncEnumerator<TResult>);
-
-                              var index = 0;
-
-                              var innerDisposable = new AssignableDisposable();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, innerDisposable, e);
-
-                              var inner = default(Func<CancellationToken, Task<bool>>);
-                              var outer = default(Func<CancellationToken, Task<bool>>);
-
-                              inner = async ct =>
-                                      {
-                                          if (await ie.MoveNext(ct)
-                                                      .ConfigureAwait(false))
-                                          {
-                                              return true;
-                                          }
-                                          innerDisposable.Disposable = null;
-                                          return await outer(ct)
-                                                     .ConfigureAwait(false);
-                                      };
-
-                              outer = async ct =>
-                                      {
-                                          if (await e.MoveNext(ct)
-                                                     .ConfigureAwait(false))
-                                          {
-                                              var enumerable = selector(e.Current, checked(index++));
-                                              ie = enumerable.GetEnumerator();
-                                              innerDisposable.Disposable = ie;
-
-                                              return await inner(ct)
-                                                         .ConfigureAwait(false);
-                                          }
-                                          return false;
-                                      };
-
-                              return Create(ct => ie == null ? outer(cts.Token) : inner(cts.Token),
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var ie = default(IAsyncEnumerator<TResult>);
+
+                    var index = 0;
+
+                    var innerDisposable = new AssignableDisposable();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, innerDisposable, e);
+
+                    var inner = default(Func<CancellationToken, Task<bool>>);
+                    var outer = default(Func<CancellationToken, Task<bool>>);
+
+                    inner = async ct =>
+                            {
+                                if (await ie.MoveNext(ct)
+                                            .ConfigureAwait(false))
+                                {
+                                    return true;
+                                }
+                                innerDisposable.Disposable = null;
+                                return await outer(ct)
+                                           .ConfigureAwait(false);
+                            };
+
+                    outer = async ct =>
+                            {
+                                if (await e.MoveNext(ct)
+                                           .ConfigureAwait(false))
+                                {
+                                    var enumerable = selector(e.Current, checked(index++));
+                                    ie = enumerable.GetEnumerator();
+                                    innerDisposable.Disposable = ie;
+
+                                    return await inner(ct)
+                                               .ConfigureAwait(false);
+                                }
+                                return false;
+                            };
+
+                    return CreateEnumerator(ct => ie == null ? outer(cts.Token) : inner(cts.Token),
                                             () => ie.Current,
                                             d.Dispose,
                                             e
-                              );
-                          });
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)

+ 145 - 141
Ix.NET/Source/System.Interactive.Async/Skip.cs

@@ -19,39 +19,40 @@ namespace System.Linq
             if (count < 0)
                 throw new ArgumentOutOfRangeException(nameof(count));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var n = count;
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      var moveNext = await e.MoveNext(ct)
-                                                            .ConfigureAwait(false);
-                                      if (n == 0)
-                                      {
-                                          return moveNext;
-                                      }
-                                      --n;
-                                      if (!moveNext)
-                                      {
-                                          return false;
-                                      }
-                                      return await f(ct)
-                                                 .ConfigureAwait(false);
-                                  };
-
-                              return Create(
-                                  ct => f(cts.Token),
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var n = count;
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            var moveNext = await e.MoveNext(ct)
+                                                  .ConfigureAwait(false);
+                            if (n == 0)
+                            {
+                                return moveNext;
+                            }
+                            --n;
+                            if (!moveNext)
+                            {
+                                return false;
+                            }
+                            return await f(ct)
+                                       .ConfigureAwait(false);
+                        };
+
+                    return CreateEnumerator(
+                        ct => f(cts.Token),
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> SkipLast<TSource>(this IAsyncEnumerable<TSource> source, int count)
@@ -61,43 +62,44 @@ namespace System.Linq
             if (count < 0)
                 throw new ArgumentOutOfRangeException(nameof(count));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var q = new Queue<TSource>();
-                              var current = default(TSource);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          var item = e.Current;
-
-                                          q.Enqueue(item);
-                                          if (q.Count > count)
-                                          {
-                                              current = q.Dequeue();
-                                              return true;
-                                          }
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-                                      return false;
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var q = new Queue<TSource>();
+                    var current = default(TSource);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                var item = e.Current;
+
+                                q.Enqueue(item);
+                                if (q.Count > count)
+                                {
+                                    current = q.Dequeue();
+                                    return true;
+                                }
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                            return false;
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> SkipWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
@@ -107,41 +109,42 @@ namespace System.Linq
             if (predicate == null)
                 throw new ArgumentNullException(nameof(predicate));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var skipping = true;
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (skipping)
-                                      {
-                                          if (await e.MoveNext(ct)
-                                                     .ConfigureAwait(false))
-                                          {
-                                              if (predicate(e.Current))
-                                                  return await f(ct)
-                                                             .ConfigureAwait(false);
-                                              skipping = false;
-                                              return true;
-                                          }
-                                          return false;
-                                      }
-                                      return await e.MoveNext(ct)
-                                                    .ConfigureAwait(false);
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var skipping = true;
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (skipping)
+                            {
+                                if (await e.MoveNext(ct)
+                                           .ConfigureAwait(false))
+                                {
+                                    if (predicate(e.Current))
+                                        return await f(ct)
+                                                   .ConfigureAwait(false);
+                                    skipping = false;
+                                    return true;
+                                }
+                                return false;
+                            }
+                            return await e.MoveNext(ct)
+                                          .ConfigureAwait(false);
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> SkipWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
@@ -151,42 +154,43 @@ namespace System.Linq
             if (predicate == null)
                 throw new ArgumentNullException(nameof(predicate));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var skipping = true;
-                              var index = 0;
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (skipping)
-                                      {
-                                          if (await e.MoveNext(ct)
-                                                     .ConfigureAwait(false))
-                                          {
-                                              if (predicate(e.Current, checked(index++)))
-                                                  return await f(ct)
-                                                             .ConfigureAwait(false);
-                                              skipping = false;
-                                              return true;
-                                          }
-                                          return false;
-                                      }
-                                      return await e.MoveNext(ct)
-                                                    .ConfigureAwait(false);
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var skipping = true;
+                    var index = 0;
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (skipping)
+                            {
+                                if (await e.MoveNext(ct)
+                                           .ConfigureAwait(false))
+                                {
+                                    if (predicate(e.Current, checked(index++)))
+                                        return await f(ct)
+                                                   .ConfigureAwait(false);
+                                    skipping = false;
+                                    return true;
+                                }
+                                return false;
+                            }
+                            return await e.MoveNext(ct)
+                                          .ConfigureAwait(false);
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 129 - 125
Ix.NET/Source/System.Interactive.Async/Take.cs

@@ -19,35 +19,36 @@ namespace System.Linq
             if (count < 0)
                 throw new ArgumentOutOfRangeException(nameof(count));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var n = count;
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              return Create(
-                                  async ct =>
-                                  {
-                                      if (n == 0)
-                                          return false;
-
-                                      var result = await e.MoveNext(cts.Token)
-                                                          .ConfigureAwait(false);
-
-                                      --n;
-
-                                      if (n == 0)
-                                          e.Dispose();
-
-                                      return result;
-                                  },
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var n = count;
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    return CreateEnumerator(
+                        async ct =>
+                        {
+                            if (n == 0)
+                                return false;
+
+                            var result = await e.MoveNext(cts.Token)
+                                                .ConfigureAwait(false);
+
+                            --n;
+
+                            if (n == 0)
+                                e.Dispose();
+
+                            return result;
+                        },
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> TakeLast<TSource>(this IAsyncEnumerable<TSource> source, int count)
@@ -57,57 +58,58 @@ namespace System.Linq
             if (count < 0)
                 throw new ArgumentOutOfRangeException(nameof(count));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              var q = new Queue<TSource>(count);
-                              var done = false;
-                              var current = default(TSource);
-
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (!done)
-                                      {
-                                          if (await e.MoveNext(ct)
-                                                     .ConfigureAwait(false))
-                                          {
-                                              if (count > 0)
-                                              {
-                                                  var item = e.Current;
-                                                  if (q.Count >= count)
-                                                      q.Dequeue();
-                                                  q.Enqueue(item);
-                                              }
-                                          }
-                                          else
-                                          {
-                                              done = true;
-                                              e.Dispose();
-                                          }
-
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-                                      if (q.Count > 0)
-                                      {
-                                          current = q.Dequeue();
-                                          return true;
-                                      }
-                                      return false;
-                                  };
-
-                              return Create(
-                                  f,
-                                  () => current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    var q = new Queue<TSource>(count);
+                    var done = false;
+                    var current = default(TSource);
+
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (!done)
+                            {
+                                if (await e.MoveNext(ct)
+                                           .ConfigureAwait(false))
+                                {
+                                    if (count > 0)
+                                    {
+                                        var item = e.Current;
+                                        if (q.Count >= count)
+                                            q.Dequeue();
+                                        q.Enqueue(item);
+                                    }
+                                }
+                                else
+                                {
+                                    done = true;
+                                    e.Dispose();
+                                }
+
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                            if (q.Count > 0)
+                            {
+                                current = q.Dequeue();
+                                return true;
+                            }
+                            return false;
+                        };
+
+                    return CreateEnumerator(
+                        f,
+                        () => current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> TakeWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
@@ -117,28 +119,29 @@ namespace System.Linq
             if (predicate == null)
                 throw new ArgumentNullException(nameof(predicate));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              return Create(
-                                  async ct =>
-                                  {
-                                      if (await e.MoveNext(cts.Token)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          return predicate(e.Current);
-                                      }
-                                      return false;
-                                  },
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    return CreateEnumerator(
+                        async ct =>
+                        {
+                            if (await e.MoveNext(cts.Token)
+                                       .ConfigureAwait(false))
+                            {
+                                return predicate(e.Current);
+                            }
+                            return false;
+                        },
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> TakeWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
@@ -148,29 +151,30 @@ namespace System.Linq
             if (predicate == null)
                 throw new ArgumentNullException(nameof(predicate));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var index = 0;
-
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
-
-                              return Create(
-                                  async ct =>
-                                  {
-                                      if (await e.MoveNext(cts.Token)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          return predicate(e.Current, checked(index++));
-                                      }
-                                      return false;
-                                  },
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var index = 0;
+
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
+
+                    return CreateEnumerator(
+                        async ct =>
+                        {
+                            if (await e.MoveNext(cts.Token)
+                                       .ConfigureAwait(false))
+                            {
+                                return predicate(e.Current, checked(index++));
+                            }
+                            return false;
+                        },
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 54 - 51
Ix.NET/Source/System.Interactive.Async/ToAsyncEnumerable.cs

@@ -1,10 +1,12 @@
 // Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
+
 using System;
 using System.Collections.Generic;
-using System.Threading.Tasks;
+using System.Linq;
 using System.Threading;
+using System.Threading.Tasks;
 
 namespace System.Linq
 {
@@ -15,29 +17,56 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return Create(() =>
-            {
-                var e = source.GetEnumerator();
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
 
-                return Create(
-                    ct => Task.Run(() =>
-                    {
-                        var res = false;
-                        try
-                        {
-                            res = e.MoveNext();
-                        }
-                        finally
+                    return CreateEnumerator(
+                        ct => Task.Run(() =>
+                                       {
+                                           var res = false;
+                                           try
+                                           {
+                                               res = e.MoveNext();
+                                           }
+                                           finally
+                                           {
+                                               if (!res)
+                                                   e.Dispose();
+                                           }
+                                           return res;
+                                       }, ct),
+                        () => e.Current,
+                        () => e.Dispose()
+                    );
+                });
+        }
+
+        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this Task<TSource> task)
+        {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+
+            return CreateEnumerable(
+                () =>
+                {
+                    var called = 0;
+
+                    var value = default(TSource);
+                    return CreateEnumerator(
+                        async ct =>
                         {
-                            if (!res)
-                                e.Dispose();
-                        }
-                        return res;
-                    }, ct),
-                    () => e.Current,
-                    () => e.Dispose()
-                );
-            });
+                            if (Interlocked.CompareExchange(ref called, 1, 0) == 0)
+                            {
+                                value = await task.ConfigureAwait(false);
+                                return true;
+                            }
+                            return false;
+                        },
+                        () => value,
+                        () => { });
+                });
         }
 
         public static IEnumerable<TSource> ToEnumerable<TSource>(this IAsyncEnumerable<TSource> source)
@@ -54,39 +83,13 @@ namespace System.Linq
             {
                 while (true)
                 {
-                    if (!e.MoveNext(CancellationToken.None).Result)
+                    if (!e.MoveNext(CancellationToken.None)
+                          .Result)
                         break;
                     var c = e.Current;
                     yield return c;
                 }
             }
         }
-
-        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this Task<TSource> task)
-        {
-            if (task == null)
-                throw new ArgumentNullException(nameof(task));
-            
-            return Create(() =>
-            {
-                var called = 0;
-
-                var value = default(TSource);
-                return Create(
-                    async ct =>
-                    {
-                        if (Interlocked.CompareExchange(ref called, 1, 0) == 0)
-                        {
-                            value = await task.ConfigureAwait(false);
-                            return true;
-                        }
-                        return false;
-                    },
-                    () => value,
-                    () => { });
-            });
-        }
-
-
     }
-}
+}

+ 57 - 56
Ix.NET/Source/System.Interactive.Async/ToObservable.cs

@@ -16,62 +16,63 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return Create(() =>
-                          {
-                              var observer = new ToAsyncEnumerableObserver<TSource>();
-
-                              var subscription = source.Subscribe(observer);
-
-                              return Create(
-                                  (ct, tcs) =>
-                                  {
-                                      var hasValue = false;
-                                      var hasCompleted = false;
-                                      var error = default(Exception);
-
-                                      lock (observer.SyncRoot)
-                                      {
-                                          if (observer.Values.Count > 0)
-                                          {
-                                              hasValue = true;
-                                              observer.Current = observer.Values.Dequeue();
-                                          }
-                                          else if (observer.HasCompleted)
-                                          {
-                                              hasCompleted = true;
-                                          }
-                                          else if (observer.Error != null)
-                                          {
-                                              error = observer.Error;
-                                          }
-                                          else
-                                          {
-                                              observer.TaskCompletionSource = tcs;
-                                          }
-                                      }
-
-                                      if (hasValue)
-                                      {
-                                          tcs.TrySetResult(true);
-                                      }
-                                      else if (hasCompleted)
-                                      {
-                                          tcs.TrySetResult(false);
-                                      }
-                                      else if (error != null)
-                                      {
-                                          tcs.TrySetException(error);
-                                      }
-
-                                      return tcs.Task;
-                                  },
-                                  () => observer.Current,
-                                  () =>
-                                  {
-                                      subscription.Dispose();
-                                      // Should we cancel in-flight operations somehow?
-                                  });
-                          });
+            return CreateEnumerable(
+                () =>
+                {
+                    var observer = new ToAsyncEnumerableObserver<TSource>();
+
+                    var subscription = source.Subscribe(observer);
+
+                    return CreateEnumerator(
+                        (ct, tcs) =>
+                        {
+                            var hasValue = false;
+                            var hasCompleted = false;
+                            var error = default(Exception);
+
+                            lock (observer.SyncRoot)
+                            {
+                                if (observer.Values.Count > 0)
+                                {
+                                    hasValue = true;
+                                    observer.Current = observer.Values.Dequeue();
+                                }
+                                else if (observer.HasCompleted)
+                                {
+                                    hasCompleted = true;
+                                }
+                                else if (observer.Error != null)
+                                {
+                                    error = observer.Error;
+                                }
+                                else
+                                {
+                                    observer.TaskCompletionSource = tcs;
+                                }
+                            }
+
+                            if (hasValue)
+                            {
+                                tcs.TrySetResult(true);
+                            }
+                            else if (hasCompleted)
+                            {
+                                tcs.TrySetResult(false);
+                            }
+                            else if (error != null)
+                            {
+                                tcs.TrySetException(error);
+                            }
+
+                            return tcs.Task;
+                        },
+                        () => observer.Current,
+                        () =>
+                        {
+                            subscription.Dispose();
+                            // Should we cancel in-flight operations somehow?
+                        });
+                });
         }
 
         public static IObservable<TSource> ToObservable<TSource>(this IAsyncEnumerable<TSource> source)

+ 45 - 44
Ix.NET/Source/System.Interactive.Async/Using.cs

@@ -18,54 +18,55 @@ namespace System.Linq
             if (enumerableFactory == null)
                 throw new ArgumentNullException(nameof(enumerableFactory));
 
-            return Create(() =>
-                          {
-                              var resource = resourceFactory();
-                              var e = default(IAsyncEnumerator<TSource>);
+            return CreateEnumerable(
+                () =>
+                {
+                    var resource = resourceFactory();
+                    var e = default(IAsyncEnumerator<TSource>);
 
-                              try
-                              {
-                                  e = enumerableFactory(resource)
-                                      .GetEnumerator();
-                              }
-                              catch (Exception)
-                              {
-                                  resource.Dispose();
-                                  throw;
-                              }
+                    try
+                    {
+                        e = enumerableFactory(resource)
+                            .GetEnumerator();
+                    }
+                    catch (Exception)
+                    {
+                        resource.Dispose();
+                        throw;
+                    }
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, resource, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, resource, e);
 
-                              var current = default(TSource);
+                    var current = default(TSource);
 
-                              return Create(
-                                  async ct =>
-                                  {
-                                      bool res;
-                                      try
-                                      {
-                                          res = await e.MoveNext(cts.Token)
-                                                       .ConfigureAwait(false);
-                                      }
-                                      catch (Exception)
-                                      {
-                                          d.Dispose();
-                                          throw;
-                                      }
-                                      if (res)
-                                      {
-                                          current = e.Current;
-                                          return true;
-                                      }
-                                      d.Dispose();
-                                      return false;
-                                  },
-                                  () => current,
-                                  d.Dispose,
-                                  null
-                              );
-                          });
+                    return CreateEnumerator(
+                        async ct =>
+                        {
+                            bool res;
+                            try
+                            {
+                                res = await e.MoveNext(cts.Token)
+                                             .ConfigureAwait(false);
+                            }
+                            catch (Exception)
+                            {
+                                d.Dispose();
+                                throw;
+                            }
+                            if (res)
+                            {
+                                current = e.Current;
+                                return true;
+                            }
+                            d.Dispose();
+                            return false;
+                        },
+                        () => current,
+                        d.Dispose,
+                        null
+                    );
+                });
         }
     }
 }

+ 53 - 51
Ix.NET/Source/System.Interactive.Async/Where.cs

@@ -19,34 +19,35 @@ namespace System.Linq
             if (predicate == null)
                 throw new ArgumentNullException(nameof(predicate));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          if (predicate(e.Current))
-                                              return true;
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-                                      return false;
-                                  };
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                if (predicate(e.Current))
+                                    return true;
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                            return false;
+                        };
 
-                              return Create(
-                                  ct => f(cts.Token),
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        ct => f(cts.Token),
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
 
         public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
@@ -56,35 +57,36 @@ namespace System.Linq
             if (predicate == null)
                 throw new ArgumentNullException(nameof(predicate));
 
-            return Create(() =>
-                          {
-                              var e = source.GetEnumerator();
-                              var index = 0;
+            return CreateEnumerable(
+                () =>
+                {
+                    var e = source.GetEnumerator();
+                    var index = 0;
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e);
 
-                              var f = default(Func<CancellationToken, Task<bool>>);
-                              f = async ct =>
-                                  {
-                                      if (await e.MoveNext(ct)
-                                                 .ConfigureAwait(false))
-                                      {
-                                          if (predicate(e.Current, checked(index++)))
-                                              return true;
-                                          return await f(ct)
-                                                     .ConfigureAwait(false);
-                                      }
-                                      return false;
-                                  };
+                    var f = default(Func<CancellationToken, Task<bool>>);
+                    f = async ct =>
+                        {
+                            if (await e.MoveNext(ct)
+                                       .ConfigureAwait(false))
+                            {
+                                if (predicate(e.Current, checked(index++)))
+                                    return true;
+                                return await f(ct)
+                                           .ConfigureAwait(false);
+                            }
+                            return false;
+                        };
 
-                              return Create(
-                                  ct => f(cts.Token),
-                                  () => e.Current,
-                                  d.Dispose,
-                                  e
-                              );
-                          });
+                    return CreateEnumerator(
+                        ct => f(cts.Token),
+                        () => e.Current,
+                        d.Dispose,
+                        e
+                    );
+                });
         }
     }
 }

+ 22 - 20
Ix.NET/Source/System.Interactive.Async/Zip.cs

@@ -20,28 +20,30 @@ namespace System.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            return Create(() =>
-                          {
-                              var e1 = first.GetEnumerator();
-                              var e2 = second.GetEnumerator();
-                              var current = default(TResult);
+            return CreateEnumerable(
+                () =>
+                {
+                    var e1 = first.GetEnumerator();
+                    var e2 = second.GetEnumerator();
+                    var current = default(TResult);
 
-                              var cts = new CancellationTokenDisposable();
-                              var d = Disposable.Create(cts, e1, e2);
+                    var cts = new CancellationTokenDisposable();
+                    var d = Disposable.Create(cts, e1, e2);
 
-                              return Create(
-                                  ct => e1.MoveNext(cts.Token)
-                                          .Zip(e2.MoveNext(cts.Token), (f, s) =>
-                                                                       {
-                                                                           var result = f && s;
-                                                                           if (result)
-                                                                               current = selector(e1.Current, e2.Current);
-                                                                           return result;
-                                                                       }),
-                                  () => current,
-                                  d.Dispose
-                              );
-                          });
+                    return CreateEnumerator(
+                        ct => e1.MoveNext(cts.Token)
+                                .Zip(e2.MoveNext(cts.Token),
+                                     (f, s) =>
+                                     {
+                                         var result = f && s;
+                                         if (result)
+                                             current = selector(e1.Current, e2.Current);
+                                         return result;
+                                     }),
+                        () => current,
+                        d.Dispose
+                    );
+                });
         }
     }
 }