OrderedAsyncEnumerable.cs 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Linq
  10. {
  11. // NB: Large portions of the implementation are based on https://github.com/dotnet/corefx/blob/master/src/System.Linq/src/System/Linq/OrderedEnumerable.cs.
  12. internal abstract class OrderedAsyncEnumerable<TElement> : AsyncIterator<TElement>, IOrderedAsyncEnumerable<TElement>, IAsyncPartition<TElement>
  13. {
  14. protected readonly IAsyncEnumerable<TElement> _source;
  15. private TElement[] _buffer;
  16. private int[] _indexes;
  17. private int _index;
  18. protected OrderedAsyncEnumerable(IAsyncEnumerable<TElement> source)
  19. {
  20. Debug.Assert(source != null);
  21. _source = source;
  22. }
  23. IOrderedAsyncEnumerable<TElement> IOrderedAsyncEnumerable<TElement>.CreateOrderedEnumerable<TKey>(Func<TElement, TKey> keySelector, IComparer<TKey> comparer, bool descending)
  24. {
  25. return new OrderedAsyncEnumerable<TElement, TKey>(_source, keySelector, comparer, descending, this);
  26. }
  27. IOrderedAsyncEnumerable<TElement> IOrderedAsyncEnumerable<TElement>.CreateOrderedEnumerable<TKey>(Func<TElement, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending)
  28. {
  29. return new OrderedAsyncEnumerableWithTask<TElement, TKey>(_source, keySelector, comparer, descending, this);
  30. }
  31. IOrderedAsyncEnumerable<TElement> IOrderedAsyncEnumerable<TElement>.CreateOrderedEnumerable<TKey>(Func<TElement, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending)
  32. {
  33. return new OrderedAsyncEnumerableWithTaskAndCancellation<TElement, TKey>(_source, keySelector, comparer, descending, this);
  34. }
  35. protected override async ValueTask<bool> MoveNextCore()
  36. {
  37. switch (_state)
  38. {
  39. case AsyncIteratorState.Allocated:
  40. _buffer = await _source.ToArrayAsync(_cancellationToken).ConfigureAwait(false); // TODO: Use buffer.
  41. var sorter = GetAsyncEnumerableSorter(_cancellationToken);
  42. _indexes = await sorter.Sort(_buffer, _buffer.Length).ConfigureAwait(false);
  43. _index = 0;
  44. _state = AsyncIteratorState.Iterating;
  45. goto case AsyncIteratorState.Iterating;
  46. case AsyncIteratorState.Iterating:
  47. if (_index < _buffer.Length)
  48. {
  49. _current = _buffer[_indexes[_index++]];
  50. return true;
  51. }
  52. await DisposeAsync().ConfigureAwait(false);
  53. break;
  54. }
  55. return false;
  56. }
  57. public override async ValueTask DisposeAsync()
  58. {
  59. _buffer = null;
  60. _indexes = null;
  61. await base.DisposeAsync().ConfigureAwait(false);
  62. }
  63. internal abstract AsyncEnumerableSorter<TElement> GetAsyncEnumerableSorter(AsyncEnumerableSorter<TElement> next, CancellationToken cancellationToken);
  64. internal AsyncEnumerableSorter<TElement> GetAsyncEnumerableSorter(CancellationToken cancellationToken) => GetAsyncEnumerableSorter(next: null, cancellationToken);
  65. public async ValueTask<TElement[]> ToArrayAsync(CancellationToken cancellationToken)
  66. {
  67. AsyncEnumerableHelpers.ArrayWithLength<TElement> elements = await AsyncEnumerableHelpers.ToArrayWithLength(_source, cancellationToken).ConfigureAwait(false);
  68. int count = elements.Length;
  69. if (count == 0)
  70. {
  71. #if NO_ARRAY_EMPTY
  72. return EmptyArray<TElement>.Value;
  73. #else
  74. return Array.Empty<TElement>();
  75. #endif
  76. }
  77. TElement[] array = elements.Array;
  78. int[] map = await SortedMap(array, count, cancellationToken).ConfigureAwait(false);
  79. var result = new TElement[count];
  80. for (int i = 0; i < result.Length; i++)
  81. {
  82. result[i] = array[map[i]];
  83. }
  84. return result;
  85. }
  86. internal async ValueTask<TElement[]> ToArrayAsync(int minIndexInclusive, int maxIndexInclusive, CancellationToken cancellationToken)
  87. {
  88. AsyncEnumerableHelpers.ArrayWithLength<TElement> elements = await AsyncEnumerableHelpers.ToArrayWithLength(_source, cancellationToken).ConfigureAwait(false);
  89. int count = elements.Length;
  90. if (count <= minIndexInclusive)
  91. {
  92. #if NO_ARRAY_EMPTY
  93. return EmptyArray<TElement>.Value;
  94. #else
  95. return Array.Empty<TElement>();
  96. #endif
  97. }
  98. if (count <= maxIndexInclusive)
  99. {
  100. maxIndexInclusive = count - 1;
  101. }
  102. TElement[] array = elements.Array;
  103. if (minIndexInclusive == maxIndexInclusive)
  104. {
  105. AsyncEnumerableSorter<TElement> sorter = GetAsyncEnumerableSorter(cancellationToken);
  106. TElement element = await sorter.ElementAt(array, count, minIndexInclusive).ConfigureAwait(false);
  107. return new TElement[] { element };
  108. }
  109. int[] map = await SortedMap(array, count, minIndexInclusive, maxIndexInclusive, cancellationToken).ConfigureAwait(false);
  110. var result = new TElement[maxIndexInclusive - minIndexInclusive + 1];
  111. for (int i = 0; minIndexInclusive <= maxIndexInclusive; i++)
  112. {
  113. result[i] = array[map[minIndexInclusive++]];
  114. }
  115. return result;
  116. }
  117. public async ValueTask<List<TElement>> ToListAsync(CancellationToken cancellationToken)
  118. {
  119. AsyncEnumerableHelpers.ArrayWithLength<TElement> elements = await AsyncEnumerableHelpers.ToArrayWithLength(_source, cancellationToken).ConfigureAwait(false);
  120. int count = elements.Length;
  121. if (count == 0)
  122. {
  123. return new List<TElement>(capacity: 0);
  124. }
  125. TElement[] array = elements.Array;
  126. int[] map = await SortedMap(array, count, cancellationToken).ConfigureAwait(false);
  127. var result = new List<TElement>(count);
  128. for (int i = 0; i < count; i++)
  129. {
  130. result.Add(array[map[i]]);
  131. }
  132. return result;
  133. }
  134. internal async ValueTask<List<TElement>> ToListAsync(int minIndexInclusive, int maxIndexInclusive, CancellationToken cancellationToken)
  135. {
  136. AsyncEnumerableHelpers.ArrayWithLength<TElement> elements = await AsyncEnumerableHelpers.ToArrayWithLength(_source, cancellationToken).ConfigureAwait(false);
  137. int count = elements.Length;
  138. if (count <= minIndexInclusive)
  139. {
  140. return new List<TElement>(0);
  141. }
  142. if (count <= maxIndexInclusive)
  143. {
  144. maxIndexInclusive = count - 1;
  145. }
  146. TElement[] array = elements.Array;
  147. if (minIndexInclusive == maxIndexInclusive)
  148. {
  149. AsyncEnumerableSorter<TElement> sorter = GetAsyncEnumerableSorter(cancellationToken);
  150. TElement element = await sorter.ElementAt(array, count, minIndexInclusive).ConfigureAwait(false);
  151. return new List<TElement>(1) { element };
  152. }
  153. int[] map = await SortedMap(array, count, minIndexInclusive, maxIndexInclusive, cancellationToken).ConfigureAwait(false);
  154. var list = new List<TElement>(maxIndexInclusive - minIndexInclusive + 1);
  155. while (minIndexInclusive <= maxIndexInclusive)
  156. {
  157. list.Add(array[map[minIndexInclusive++]]);
  158. }
  159. return list;
  160. }
  161. public async ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken)
  162. {
  163. if (_source is IAsyncIListProvider<TElement> listProv)
  164. {
  165. int count = await listProv.GetCountAsync(onlyIfCheap, cancellationToken).ConfigureAwait(false);
  166. }
  167. return !onlyIfCheap || _source is ICollection<TElement> || _source is ICollection ? await _source.CountAsync(cancellationToken).ConfigureAwait(false) : -1;
  168. }
  169. internal async ValueTask<int> GetCountAsync(int minIndexInclusive, int maxIndexInclusive, bool onlyIfCheap, CancellationToken cancellationToken)
  170. {
  171. int count = await GetCountAsync(onlyIfCheap, cancellationToken).ConfigureAwait(false);
  172. if (count <= 0)
  173. {
  174. return count;
  175. }
  176. if (count <= minIndexInclusive)
  177. {
  178. return 0;
  179. }
  180. return (count <= maxIndexInclusive ? count : maxIndexInclusive + 1) - minIndexInclusive;
  181. }
  182. private ValueTask<int[]> SortedMap(TElement[] elements, int count, CancellationToken cancellationToken)
  183. {
  184. AsyncEnumerableSorter<TElement> sorter = GetAsyncEnumerableSorter(cancellationToken);
  185. return sorter.Sort(elements, count);
  186. }
  187. private ValueTask<int[]> SortedMap(TElement[] elements, int count, int minIndexInclusive, int maxIndexInclusive, CancellationToken cancellationToken)
  188. {
  189. AsyncEnumerableSorter<TElement> sorter = GetAsyncEnumerableSorter(cancellationToken);
  190. return sorter.Sort(elements, count, minIndexInclusive, maxIndexInclusive);
  191. }
  192. private AsyncCachingComparer<TElement> GetComparer() => GetComparer(childComparer: null);
  193. internal abstract AsyncCachingComparer<TElement> GetComparer(AsyncCachingComparer<TElement> childComparer);
  194. public async ValueTask<Maybe<TElement>> TryGetFirstAsync(CancellationToken cancellationToken)
  195. {
  196. IAsyncEnumerator<TElement> e = _source.GetAsyncEnumerator(cancellationToken);
  197. try
  198. {
  199. if (!await e.MoveNextAsync().ConfigureAwait(false))
  200. {
  201. return new Maybe<TElement>();
  202. }
  203. TElement value = e.Current;
  204. AsyncCachingComparer<TElement> comparer = GetComparer();
  205. await comparer.SetElement(value, cancellationToken).ConfigureAwait(false);
  206. while (await e.MoveNextAsync().ConfigureAwait(false))
  207. {
  208. TElement x = e.Current;
  209. if (await comparer.Compare(x, cacheLower: true, cancellationToken).ConfigureAwait(false) < 0)
  210. {
  211. value = x;
  212. }
  213. }
  214. return new Maybe<TElement>(value);
  215. }
  216. finally
  217. {
  218. await e.DisposeAsync().ConfigureAwait(false);
  219. }
  220. }
  221. public async ValueTask<Maybe<TElement>> TryGetLastAsync(CancellationToken cancellationToken)
  222. {
  223. IAsyncEnumerator<TElement> e = _source.GetAsyncEnumerator(cancellationToken);
  224. try
  225. {
  226. if (!await e.MoveNextAsync().ConfigureAwait(false))
  227. {
  228. return new Maybe<TElement>();
  229. }
  230. TElement value = e.Current;
  231. AsyncCachingComparer<TElement> comparer = GetComparer();
  232. await comparer.SetElement(value, cancellationToken).ConfigureAwait(false);
  233. while (await e.MoveNextAsync().ConfigureAwait(false))
  234. {
  235. TElement current = e.Current;
  236. if (await comparer.Compare(current, cacheLower: false, cancellationToken).ConfigureAwait(false) >= 0)
  237. {
  238. value = current;
  239. }
  240. }
  241. return new Maybe<TElement>(value);
  242. }
  243. finally
  244. {
  245. await e.DisposeAsync().ConfigureAwait(false);
  246. }
  247. }
  248. internal async ValueTask<Maybe<TElement>> TryGetLastAsync(int minIndexInclusive, int maxIndexInclusive, CancellationToken cancellationToken)
  249. {
  250. AsyncEnumerableHelpers.ArrayWithLength<TElement> elements = await AsyncEnumerableHelpers.ToArrayWithLength(_source, cancellationToken).ConfigureAwait(false);
  251. int count = elements.Length;
  252. if (minIndexInclusive >= count)
  253. {
  254. return new Maybe<TElement>();
  255. }
  256. TElement[] array = elements.Array;
  257. TElement last;
  258. if (maxIndexInclusive < count - 1)
  259. {
  260. AsyncEnumerableSorter<TElement> sorter = GetAsyncEnumerableSorter(cancellationToken);
  261. last = await sorter.ElementAt(array, count, maxIndexInclusive).ConfigureAwait(false);
  262. }
  263. else
  264. {
  265. last = await Last(array, count, cancellationToken).ConfigureAwait(false);
  266. }
  267. return new Maybe<TElement>(last);
  268. }
  269. private async ValueTask<TElement> Last(TElement[] items, int count, CancellationToken cancellationToken)
  270. {
  271. TElement value = items[0];
  272. AsyncCachingComparer<TElement> comparer = GetComparer();
  273. await comparer.SetElement(value, cancellationToken).ConfigureAwait(false);
  274. for (int i = 1; i != count; ++i)
  275. {
  276. TElement x = items[i];
  277. if (await comparer.Compare(x, cacheLower: false, cancellationToken).ConfigureAwait(false) >= 0)
  278. {
  279. value = x;
  280. }
  281. }
  282. return value;
  283. }
  284. public IAsyncPartition<TElement> Skip(int count) => new OrderedAsyncPartition<TElement>(this, count, int.MaxValue);
  285. public IAsyncPartition<TElement> Take(int count) => new OrderedAsyncPartition<TElement>(this, 0, count - 1);
  286. public ValueTask<Maybe<TElement>> TryGetElementAtAsync(int index, CancellationToken cancellationToken)
  287. {
  288. if (index == 0)
  289. {
  290. return TryGetFirstAsync(cancellationToken);
  291. }
  292. if (index > 0)
  293. {
  294. return Core();
  295. async ValueTask<Maybe<TElement>> Core()
  296. {
  297. AsyncEnumerableHelpers.ArrayWithLength<TElement> elements = await AsyncEnumerableHelpers.ToArrayWithLength(_source, cancellationToken).ConfigureAwait(false);
  298. int count = elements.Length;
  299. if (index < count)
  300. {
  301. AsyncEnumerableSorter<TElement> sorter = GetAsyncEnumerableSorter(cancellationToken);
  302. TElement element = await sorter.ElementAt(elements.Array, count, index).ConfigureAwait(false);
  303. return new Maybe<TElement>(element);
  304. }
  305. return new Maybe<TElement>();
  306. }
  307. }
  308. return new ValueTask<Maybe<TElement>>(new Maybe<TElement>());
  309. }
  310. }
  311. internal sealed class OrderedAsyncEnumerable<TElement, TKey> : OrderedAsyncEnumerable<TElement>
  312. {
  313. private readonly IComparer<TKey> _comparer;
  314. private readonly bool _descending;
  315. private readonly Func<TElement, TKey> _keySelector;
  316. private readonly OrderedAsyncEnumerable<TElement> _parent;
  317. public OrderedAsyncEnumerable(IAsyncEnumerable<TElement> source, Func<TElement, TKey> keySelector, IComparer<TKey> comparer, bool descending, OrderedAsyncEnumerable<TElement> parent)
  318. : base(source)
  319. {
  320. Debug.Assert(keySelector != null);
  321. _keySelector = keySelector;
  322. _comparer = comparer ?? Comparer<TKey>.Default;
  323. _descending = descending;
  324. _parent = parent;
  325. }
  326. public override AsyncIteratorBase<TElement> Clone()
  327. {
  328. return new OrderedAsyncEnumerable<TElement, TKey>(_source, _keySelector, _comparer, _descending, _parent);
  329. }
  330. internal override AsyncEnumerableSorter<TElement> GetAsyncEnumerableSorter(AsyncEnumerableSorter<TElement> next, CancellationToken cancellationToken)
  331. {
  332. var sorter = new SyncKeySelectorAsyncEnumerableSorter<TElement, TKey>(_keySelector, _comparer, _descending, next);
  333. if (_parent != null)
  334. {
  335. return _parent.GetAsyncEnumerableSorter(sorter, cancellationToken);
  336. }
  337. return sorter;
  338. }
  339. internal override AsyncCachingComparer<TElement> GetComparer(AsyncCachingComparer<TElement> childComparer)
  340. {
  341. AsyncCachingComparer<TElement> cmp = childComparer == null
  342. ? new AsyncCachingComparer<TElement, TKey>(_keySelector, _comparer, _descending)
  343. : new AsyncCachingComparerWithChild<TElement, TKey>(_keySelector, _comparer, _descending, childComparer);
  344. return _parent != null ? _parent.GetComparer(cmp) : cmp;
  345. }
  346. }
  347. internal sealed class OrderedAsyncEnumerableWithTask<TElement, TKey> : OrderedAsyncEnumerable<TElement>
  348. {
  349. private readonly IComparer<TKey> _comparer;
  350. private readonly bool _descending;
  351. private readonly Func<TElement, ValueTask<TKey>> _keySelector;
  352. private readonly OrderedAsyncEnumerable<TElement> _parent;
  353. public OrderedAsyncEnumerableWithTask(IAsyncEnumerable<TElement> source, Func<TElement, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending, OrderedAsyncEnumerable<TElement> parent)
  354. : base(source)
  355. {
  356. Debug.Assert(keySelector != null);
  357. _keySelector = keySelector;
  358. _comparer = comparer ?? Comparer<TKey>.Default;
  359. _descending = descending;
  360. _parent = parent;
  361. }
  362. public override AsyncIteratorBase<TElement> Clone()
  363. {
  364. return new OrderedAsyncEnumerableWithTask<TElement, TKey>(_source, _keySelector, _comparer, _descending, _parent);
  365. }
  366. internal override AsyncEnumerableSorter<TElement> GetAsyncEnumerableSorter(AsyncEnumerableSorter<TElement> next, CancellationToken cancellationToken)
  367. {
  368. var sorter = new AsyncKeySelectorAsyncEnumerableSorter<TElement, TKey>(_keySelector, _comparer, _descending, next);
  369. if (_parent != null)
  370. {
  371. return _parent.GetAsyncEnumerableSorter(sorter, cancellationToken);
  372. }
  373. return sorter;
  374. }
  375. internal override AsyncCachingComparer<TElement> GetComparer(AsyncCachingComparer<TElement> childComparer)
  376. {
  377. AsyncCachingComparer<TElement> cmp = childComparer == null
  378. ? new AsyncCachingComparerWithTask<TElement, TKey>(_keySelector, _comparer, _descending)
  379. : new AsyncCachingComparerWithTaskAndChild<TElement, TKey>(_keySelector, _comparer, _descending, childComparer);
  380. return _parent != null ? _parent.GetComparer(cmp) : cmp;
  381. }
  382. }
  383. #if !NO_DEEP_CANCELLATION
  384. internal sealed class OrderedAsyncEnumerableWithTaskAndCancellation<TElement, TKey> : OrderedAsyncEnumerable<TElement>
  385. {
  386. private readonly IComparer<TKey> _comparer;
  387. private readonly bool _descending;
  388. private readonly Func<TElement, CancellationToken, ValueTask<TKey>> _keySelector;
  389. private readonly OrderedAsyncEnumerable<TElement> _parent;
  390. public OrderedAsyncEnumerableWithTaskAndCancellation(IAsyncEnumerable<TElement> source, Func<TElement, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending, OrderedAsyncEnumerable<TElement> parent)
  391. : base(source)
  392. {
  393. Debug.Assert(keySelector != null);
  394. _keySelector = keySelector;
  395. _comparer = comparer ?? Comparer<TKey>.Default;
  396. _descending = descending;
  397. _parent = parent;
  398. }
  399. public override AsyncIteratorBase<TElement> Clone()
  400. {
  401. return new OrderedAsyncEnumerableWithTaskAndCancellation<TElement, TKey>(_source, _keySelector, _comparer, _descending, _parent);
  402. }
  403. internal override AsyncEnumerableSorter<TElement> GetAsyncEnumerableSorter(AsyncEnumerableSorter<TElement> next, CancellationToken cancellationToken)
  404. {
  405. var sorter = new AsyncKeySelectorAsyncEnumerableSorterWithCancellation<TElement, TKey>(_keySelector, _comparer, _descending, next, cancellationToken);
  406. if (_parent != null)
  407. {
  408. return _parent.GetAsyncEnumerableSorter(sorter, cancellationToken);
  409. }
  410. return sorter;
  411. }
  412. internal override AsyncCachingComparer<TElement> GetComparer(AsyncCachingComparer<TElement> childComparer)
  413. {
  414. AsyncCachingComparer<TElement> cmp = childComparer == null
  415. ? new AsyncCachingComparerWithTaskAndCancellation<TElement, TKey>(_keySelector, _comparer, _descending)
  416. : new AsyncCachingComparerWithTaskAndCancellationAndChild<TElement, TKey>(_keySelector, _comparer, _descending, childComparer);
  417. return _parent != null ? _parent.GetComparer(cmp) : cmp;
  418. }
  419. }
  420. #endif
  421. internal abstract class AsyncEnumerableSorter<TElement>
  422. {
  423. internal abstract ValueTask ComputeKeys(TElement[] elements, int count);
  424. internal abstract int CompareAnyKeys(int index1, int index2);
  425. public async ValueTask<int[]> Sort(TElement[] elements, int count)
  426. {
  427. int[] map = await ComputeMap(elements, count).ConfigureAwait(false);
  428. QuickSort(map, 0, count - 1);
  429. return map;
  430. }
  431. public async ValueTask<int[]> Sort(TElement[] elements, int count, int minIndexInclusive, int maxIndexInclusive)
  432. {
  433. int[] map = await ComputeMap(elements, count).ConfigureAwait(false);
  434. PartialQuickSort(map, 0, count - 1, minIndexInclusive, maxIndexInclusive);
  435. return map;
  436. }
  437. public async ValueTask<TElement> ElementAt(TElement[] elements, int count, int index)
  438. {
  439. int[] map = await ComputeMap(elements, count).ConfigureAwait(false);
  440. return index == 0 ?
  441. elements[Min(map, count)] :
  442. elements[QuickSelect(map, count - 1, index)];
  443. }
  444. private async ValueTask<int[]> ComputeMap(TElement[] elements, int count)
  445. {
  446. await ComputeKeys(elements, count).ConfigureAwait(false);
  447. var map = new int[count];
  448. for (var i = 0; i < count; i++)
  449. {
  450. map[i] = i;
  451. }
  452. return map;
  453. }
  454. protected abstract void QuickSort(int[] map, int left, int right);
  455. protected abstract void PartialQuickSort(int[] map, int left, int right, int minIndexInclusive, int maxIndexInclusive);
  456. protected abstract int QuickSelect(int[] map, int right, int idx);
  457. protected abstract int Min(int[] map, int count);
  458. }
  459. internal abstract class AsyncEnumerableSorterBase<TElement, TKey> : AsyncEnumerableSorter<TElement>
  460. {
  461. private readonly IComparer<TKey> _comparer;
  462. private readonly bool _descending;
  463. protected readonly AsyncEnumerableSorter<TElement> _next;
  464. protected TKey[] _keys;
  465. public AsyncEnumerableSorterBase(IComparer<TKey> comparer, bool descending, AsyncEnumerableSorter<TElement> next)
  466. {
  467. _comparer = comparer;
  468. _descending = descending;
  469. _next = next;
  470. }
  471. internal sealed override int CompareAnyKeys(int index1, int index2)
  472. {
  473. int c = _comparer.Compare(_keys[index1], _keys[index2]);
  474. if (c == 0)
  475. {
  476. return _next == null ? index1 - index2 : _next.CompareAnyKeys(index1, index2);
  477. }
  478. else
  479. {
  480. return (_descending != (c > 0)) ? 1 : -1;
  481. }
  482. }
  483. private int CompareKeys(int index1, int index2) => index1 == index2 ? 0 : CompareAnyKeys(index1, index2);
  484. protected override void QuickSort(int[] map, int left, int right)
  485. {
  486. // REVIEW: Consider using Array.Sort, see https://github.com/dotnet/corefx/commit/a6aff797a33e606a60ec0c9ca034a161c609620f#diff-d90239bd8284188e2bd210790483f5ca.
  487. do
  488. {
  489. var i = left;
  490. var j = right;
  491. var x = map[i + ((j - i) >> 1)];
  492. do
  493. {
  494. while (i < map.Length && CompareKeys(x, map[i]) > 0)
  495. {
  496. i++;
  497. }
  498. while (j >= 0 && CompareKeys(x, map[j]) < 0)
  499. {
  500. j--;
  501. }
  502. if (i > j)
  503. {
  504. break;
  505. }
  506. if (i < j)
  507. {
  508. var temp = map[i];
  509. map[i] = map[j];
  510. map[j] = temp;
  511. }
  512. i++;
  513. j--;
  514. }
  515. while (i <= j);
  516. if (j - left <= right - i)
  517. {
  518. if (left < j)
  519. {
  520. QuickSort(map, left, j);
  521. }
  522. left = i;
  523. }
  524. else
  525. {
  526. if (i < right)
  527. {
  528. QuickSort(map, i, right);
  529. }
  530. right = j;
  531. }
  532. }
  533. while (left < right);
  534. }
  535. protected override void PartialQuickSort(int[] map, int left, int right, int minIndexInclusive, int maxIndexInclusive)
  536. {
  537. do
  538. {
  539. int i = left;
  540. int j = right;
  541. int x = map[i + ((j - i) >> 1)];
  542. do
  543. {
  544. while (i < map.Length && CompareKeys(x, map[i]) > 0)
  545. {
  546. i++;
  547. }
  548. while (j >= 0 && CompareKeys(x, map[j]) < 0)
  549. {
  550. j--;
  551. }
  552. if (i > j)
  553. {
  554. break;
  555. }
  556. if (i < j)
  557. {
  558. int temp = map[i];
  559. map[i] = map[j];
  560. map[j] = temp;
  561. }
  562. i++;
  563. j--;
  564. }
  565. while (i <= j);
  566. if (minIndexInclusive >= i)
  567. {
  568. left = i + 1;
  569. }
  570. else if (maxIndexInclusive <= j)
  571. {
  572. right = j - 1;
  573. }
  574. if (j - left <= right - i)
  575. {
  576. if (left < j)
  577. {
  578. PartialQuickSort(map, left, j, minIndexInclusive, maxIndexInclusive);
  579. }
  580. left = i;
  581. }
  582. else
  583. {
  584. if (i < right)
  585. {
  586. PartialQuickSort(map, i, right, minIndexInclusive, maxIndexInclusive);
  587. }
  588. right = j;
  589. }
  590. }
  591. while (left < right);
  592. }
  593. protected override int QuickSelect(int[] map, int right, int idx)
  594. {
  595. var left = 0;
  596. do
  597. {
  598. var i = left;
  599. var j = right;
  600. var x = map[i + ((j - i) >> 1)];
  601. do
  602. {
  603. while (i < map.Length && CompareKeys(x, map[i]) > 0)
  604. {
  605. i++;
  606. }
  607. while (j >= 0 && CompareKeys(x, map[j]) < 0)
  608. {
  609. j--;
  610. }
  611. if (i > j)
  612. {
  613. break;
  614. }
  615. if (i < j)
  616. {
  617. var temp = map[i];
  618. map[i] = map[j];
  619. map[j] = temp;
  620. }
  621. i++;
  622. j--;
  623. }
  624. while (i <= j);
  625. if (i <= idx)
  626. {
  627. left = i + 1;
  628. }
  629. else
  630. {
  631. right = j - 1;
  632. }
  633. if (j - left <= right - i)
  634. {
  635. if (left < j)
  636. {
  637. right = j;
  638. }
  639. left = i;
  640. }
  641. else
  642. {
  643. if (i < right)
  644. {
  645. left = i;
  646. }
  647. right = j;
  648. }
  649. }
  650. while (left < right);
  651. return map[idx];
  652. }
  653. protected override int Min(int[] map, int count)
  654. {
  655. var index = 0;
  656. for (var i = 1; i < count; i++)
  657. {
  658. if (CompareKeys(map[i], map[index]) < 0)
  659. {
  660. index = i;
  661. }
  662. }
  663. return map[index];
  664. }
  665. }
  666. internal sealed class SyncKeySelectorAsyncEnumerableSorter<TElement, TKey> : AsyncEnumerableSorterBase<TElement, TKey>
  667. {
  668. private readonly Func<TElement, TKey> _keySelector;
  669. public SyncKeySelectorAsyncEnumerableSorter(Func<TElement, TKey> keySelector, IComparer<TKey> comparer, bool descending, AsyncEnumerableSorter<TElement> next)
  670. : base(comparer, descending, next)
  671. {
  672. _keySelector = keySelector;
  673. }
  674. internal override async ValueTask ComputeKeys(TElement[] elements, int count)
  675. {
  676. _keys = new TKey[count];
  677. for (var i = 0; i < count; i++)
  678. {
  679. _keys[i] = _keySelector(elements[i]);
  680. }
  681. if (_next != null)
  682. {
  683. await _next.ComputeKeys(elements, count).ConfigureAwait(false);
  684. }
  685. }
  686. }
  687. internal sealed class AsyncKeySelectorAsyncEnumerableSorter<TElement, TKey> : AsyncEnumerableSorterBase<TElement, TKey>
  688. {
  689. private readonly Func<TElement, ValueTask<TKey>> _keySelector;
  690. public AsyncKeySelectorAsyncEnumerableSorter(Func<TElement, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending, AsyncEnumerableSorter<TElement> next)
  691. : base(comparer, descending, next)
  692. {
  693. _keySelector = keySelector;
  694. }
  695. internal override async ValueTask ComputeKeys(TElement[] elements, int count)
  696. {
  697. _keys = new TKey[count];
  698. for (var i = 0; i < count; i++)
  699. {
  700. _keys[i] = await _keySelector(elements[i]).ConfigureAwait(false);
  701. }
  702. if (_next != null)
  703. {
  704. await _next.ComputeKeys(elements, count).ConfigureAwait(false);
  705. }
  706. }
  707. }
  708. #if !NO_DEEP_CANCELLATION
  709. internal sealed class AsyncKeySelectorAsyncEnumerableSorterWithCancellation<TElement, TKey> : AsyncEnumerableSorterBase<TElement, TKey>
  710. {
  711. private readonly Func<TElement, CancellationToken, ValueTask<TKey>> _keySelector;
  712. private readonly CancellationToken _cancellationToken;
  713. public AsyncKeySelectorAsyncEnumerableSorterWithCancellation(Func<TElement, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending, AsyncEnumerableSorter<TElement> next, CancellationToken cancellationToken)
  714. : base(comparer, descending, next)
  715. {
  716. _keySelector = keySelector;
  717. _cancellationToken = cancellationToken;
  718. }
  719. internal override async ValueTask ComputeKeys(TElement[] elements, int count)
  720. {
  721. _keys = new TKey[count];
  722. for (var i = 0; i < count; i++)
  723. {
  724. _keys[i] = await _keySelector(elements[i], _cancellationToken).ConfigureAwait(false);
  725. }
  726. if (_next != null)
  727. {
  728. await _next.ComputeKeys(elements, count).ConfigureAwait(false);
  729. }
  730. }
  731. }
  732. #endif
  733. internal sealed class OrderedAsyncPartition<TElement> : AsyncIterator<TElement>, IAsyncPartition<TElement>
  734. {
  735. private readonly OrderedAsyncEnumerable<TElement> _source;
  736. private readonly int _minIndexInclusive;
  737. private readonly int _maxIndexInclusive;
  738. public OrderedAsyncPartition(OrderedAsyncEnumerable<TElement> source, int minIndexInclusive, int maxIndexInclusive)
  739. {
  740. _source = source;
  741. _minIndexInclusive = minIndexInclusive;
  742. _maxIndexInclusive = maxIndexInclusive;
  743. }
  744. public override AsyncIteratorBase<TElement> Clone() => new OrderedAsyncPartition<TElement>(_source, _minIndexInclusive, _maxIndexInclusive);
  745. public ValueTask<int> GetCountAsync(bool onlyIfCheap, CancellationToken cancellationToken) =>
  746. _source.GetCountAsync(_minIndexInclusive, _maxIndexInclusive, onlyIfCheap, cancellationToken);
  747. public IAsyncPartition<TElement> Skip(int count)
  748. {
  749. int minIndex = unchecked(_minIndexInclusive + count);
  750. if (unchecked((uint)minIndex > (uint)_maxIndexInclusive))
  751. {
  752. return AsyncEnumerable.EmptyAsyncIterator<TElement>.Instance;
  753. }
  754. return new OrderedAsyncPartition<TElement>(_source, minIndex, _maxIndexInclusive);
  755. }
  756. public IAsyncPartition<TElement> Take(int count)
  757. {
  758. int maxIndex = unchecked(_minIndexInclusive + count - 1);
  759. if (unchecked((uint)maxIndex >= (uint)_maxIndexInclusive))
  760. {
  761. return this;
  762. }
  763. return new OrderedAsyncPartition<TElement>(_source, _minIndexInclusive, maxIndex);
  764. }
  765. public ValueTask<TElement[]> ToArrayAsync(CancellationToken cancellationToken) =>
  766. _source.ToArrayAsync(_minIndexInclusive, _maxIndexInclusive, cancellationToken);
  767. public ValueTask<List<TElement>> ToListAsync(CancellationToken cancellationToken) =>
  768. _source.ToListAsync(_minIndexInclusive, _maxIndexInclusive, cancellationToken);
  769. public ValueTask<Maybe<TElement>> TryGetElementAtAsync(int index, CancellationToken cancellationToken)
  770. {
  771. if (unchecked((uint)index <= (uint)(_maxIndexInclusive - _minIndexInclusive)))
  772. {
  773. return _source.TryGetElementAtAsync(index + _minIndexInclusive, cancellationToken);
  774. }
  775. return new ValueTask<Maybe<TElement>>(new Maybe<TElement>());
  776. }
  777. public ValueTask<Maybe<TElement>> TryGetFirstAsync(CancellationToken cancellationToken) =>
  778. _source.TryGetElementAtAsync(_minIndexInclusive, cancellationToken);
  779. public ValueTask<Maybe<TElement>> TryGetLastAsync(CancellationToken cancellationToken) =>
  780. _source.TryGetLastAsync(_minIndexInclusive, _maxIndexInclusive, cancellationToken);
  781. // REVIEW: Consider to tear off an iterator object rather than storing this state here?
  782. private TElement[] _buffer;
  783. private int[] _indexes;
  784. private int _minIndexIterator;
  785. private int _maxIndexIterator;
  786. protected override async ValueTask<bool> MoveNextCore()
  787. {
  788. switch (_state)
  789. {
  790. case AsyncIteratorState.Allocated:
  791. _buffer = await _source.ToArrayAsync(_cancellationToken).ConfigureAwait(false); // TODO: Use buffer.
  792. _minIndexIterator = _minIndexInclusive;
  793. _maxIndexIterator = _maxIndexInclusive;
  794. int count = _buffer.Length;
  795. if (count > _minIndexIterator)
  796. {
  797. if (count <= _maxIndexIterator)
  798. {
  799. _maxIndexIterator = count - 1;
  800. }
  801. AsyncEnumerableSorter<TElement> sorter = _source.GetAsyncEnumerableSorter(_cancellationToken);
  802. if (_minIndexIterator == _maxIndexIterator)
  803. {
  804. _current = await sorter.ElementAt(_buffer, _buffer.Length, _minIndexIterator).ConfigureAwait(false);
  805. _minIndexIterator = int.MaxValue;
  806. _maxIndexIterator = int.MinValue;
  807. _state = AsyncIteratorState.Iterating;
  808. return true;
  809. }
  810. else
  811. {
  812. _indexes = await sorter.Sort(_buffer, _buffer.Length, _minIndexIterator, _maxIndexIterator).ConfigureAwait(false);
  813. }
  814. _state = AsyncIteratorState.Iterating;
  815. goto case AsyncIteratorState.Iterating;
  816. }
  817. await DisposeAsync();
  818. break;
  819. case AsyncIteratorState.Iterating:
  820. if (_minIndexIterator <= _maxIndexIterator)
  821. {
  822. _current = _buffer[_indexes[_minIndexIterator++]];
  823. return true;
  824. }
  825. await DisposeAsync().ConfigureAwait(false);
  826. break;
  827. }
  828. return false;
  829. }
  830. public override async ValueTask DisposeAsync()
  831. {
  832. _buffer = null;
  833. _indexes = null;
  834. await base.DisposeAsync().ConfigureAwait(false);
  835. }
  836. }
  837. internal abstract class AsyncCachingComparer<TElement>
  838. {
  839. internal abstract ValueTask<int> Compare(TElement element, bool cacheLower, CancellationToken cancellationToken);
  840. internal abstract ValueTask SetElement(TElement element, CancellationToken cancellationToken);
  841. }
  842. internal class AsyncCachingComparer<TElement, TKey> : AsyncCachingComparer<TElement>
  843. {
  844. protected readonly Func<TElement, TKey> _keySelector;
  845. protected readonly IComparer<TKey> _comparer;
  846. protected readonly bool _descending;
  847. protected TKey _lastKey;
  848. public AsyncCachingComparer(Func<TElement, TKey> keySelector, IComparer<TKey> comparer, bool descending)
  849. {
  850. _keySelector = keySelector;
  851. _comparer = comparer;
  852. _descending = descending;
  853. }
  854. internal override ValueTask<int> Compare(TElement element, bool cacheLower, CancellationToken cancellationToken)
  855. {
  856. TKey newKey = _keySelector(element);
  857. int cmp = _descending ? _comparer.Compare(_lastKey, newKey) : _comparer.Compare(newKey, _lastKey);
  858. if (cacheLower == cmp < 0)
  859. {
  860. _lastKey = newKey;
  861. }
  862. return new ValueTask<int>(cmp);
  863. }
  864. internal override ValueTask SetElement(TElement element, CancellationToken cancellationToken)
  865. {
  866. _lastKey = _keySelector(element);
  867. return new ValueTask();
  868. }
  869. }
  870. internal sealed class AsyncCachingComparerWithChild<TElement, TKey> : AsyncCachingComparer<TElement, TKey>
  871. {
  872. private readonly AsyncCachingComparer<TElement> _child;
  873. public AsyncCachingComparerWithChild(Func<TElement, TKey> keySelector, IComparer<TKey> comparer, bool descending, AsyncCachingComparer<TElement> child)
  874. : base(keySelector, comparer, descending)
  875. {
  876. _child = child;
  877. }
  878. internal override async ValueTask<int> Compare(TElement element, bool cacheLower, CancellationToken cancellationToken)
  879. {
  880. TKey newKey = _keySelector(element);
  881. int cmp = _descending ? _comparer.Compare(_lastKey, newKey) : _comparer.Compare(newKey, _lastKey);
  882. if (cmp == 0)
  883. {
  884. return await _child.Compare(element, cacheLower, cancellationToken).ConfigureAwait(false);
  885. }
  886. if (cacheLower == cmp < 0)
  887. {
  888. _lastKey = newKey;
  889. await _child.SetElement(element, cancellationToken).ConfigureAwait(false);
  890. }
  891. return cmp;
  892. }
  893. internal override async ValueTask SetElement(TElement element, CancellationToken cancellationToken)
  894. {
  895. await base.SetElement(element, cancellationToken).ConfigureAwait(false);
  896. await _child.SetElement(element, cancellationToken).ConfigureAwait(false);
  897. }
  898. }
  899. internal class AsyncCachingComparerWithTask<TElement, TKey> : AsyncCachingComparer<TElement>
  900. {
  901. protected readonly Func<TElement, ValueTask<TKey>> _keySelector;
  902. protected readonly IComparer<TKey> _comparer;
  903. protected readonly bool _descending;
  904. protected TKey _lastKey;
  905. public AsyncCachingComparerWithTask(Func<TElement, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending)
  906. {
  907. _keySelector = keySelector;
  908. _comparer = comparer;
  909. _descending = descending;
  910. }
  911. internal override async ValueTask<int> Compare(TElement element, bool cacheLower, CancellationToken cancellationToken)
  912. {
  913. TKey newKey = await _keySelector(element).ConfigureAwait(false);
  914. int cmp = _descending ? _comparer.Compare(_lastKey, newKey) : _comparer.Compare(newKey, _lastKey);
  915. if (cacheLower == cmp < 0)
  916. {
  917. _lastKey = newKey;
  918. }
  919. return cmp;
  920. }
  921. internal override async ValueTask SetElement(TElement element, CancellationToken cancellationToken)
  922. {
  923. _lastKey = await _keySelector(element).ConfigureAwait(false);
  924. }
  925. }
  926. internal sealed class AsyncCachingComparerWithTaskAndChild<TElement, TKey> : AsyncCachingComparerWithTask<TElement, TKey>
  927. {
  928. private readonly AsyncCachingComparer<TElement> _child;
  929. public AsyncCachingComparerWithTaskAndChild(Func<TElement, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending, AsyncCachingComparer<TElement> child)
  930. : base(keySelector, comparer, descending)
  931. {
  932. _child = child;
  933. }
  934. internal override async ValueTask<int> Compare(TElement element, bool cacheLower, CancellationToken cancellationToken)
  935. {
  936. TKey newKey = await _keySelector(element).ConfigureAwait(false);
  937. int cmp = _descending ? _comparer.Compare(_lastKey, newKey) : _comparer.Compare(newKey, _lastKey);
  938. if (cmp == 0)
  939. {
  940. return await _child.Compare(element, cacheLower, cancellationToken).ConfigureAwait(false);
  941. }
  942. if (cacheLower == cmp < 0)
  943. {
  944. _lastKey = newKey;
  945. await _child.SetElement(element, cancellationToken).ConfigureAwait(false);
  946. }
  947. return cmp;
  948. }
  949. internal override async ValueTask SetElement(TElement element, CancellationToken cancellationToken)
  950. {
  951. await base.SetElement(element, cancellationToken).ConfigureAwait(false);
  952. await _child.SetElement(element, cancellationToken).ConfigureAwait(false);
  953. }
  954. }
  955. #if !NO_DEEP_CANCELLATION
  956. internal class AsyncCachingComparerWithTaskAndCancellation<TElement, TKey> : AsyncCachingComparer<TElement>
  957. {
  958. protected readonly Func<TElement, CancellationToken, ValueTask<TKey>> _keySelector;
  959. protected readonly IComparer<TKey> _comparer;
  960. protected readonly bool _descending;
  961. protected TKey _lastKey;
  962. public AsyncCachingComparerWithTaskAndCancellation(Func<TElement, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending)
  963. {
  964. _keySelector = keySelector;
  965. _comparer = comparer;
  966. _descending = descending;
  967. }
  968. internal override async ValueTask<int> Compare(TElement element, bool cacheLower, CancellationToken cancellationToken)
  969. {
  970. TKey newKey = await _keySelector(element, cancellationToken).ConfigureAwait(false);
  971. int cmp = _descending ? _comparer.Compare(_lastKey, newKey) : _comparer.Compare(newKey, _lastKey);
  972. if (cacheLower == cmp < 0)
  973. {
  974. _lastKey = newKey;
  975. }
  976. return cmp;
  977. }
  978. internal override async ValueTask SetElement(TElement element, CancellationToken cancellationToken)
  979. {
  980. _lastKey = await _keySelector(element, cancellationToken).ConfigureAwait(false);
  981. }
  982. }
  983. internal sealed class AsyncCachingComparerWithTaskAndCancellationAndChild<TElement, TKey> : AsyncCachingComparerWithTaskAndCancellation<TElement, TKey>
  984. {
  985. private readonly AsyncCachingComparer<TElement> _child;
  986. public AsyncCachingComparerWithTaskAndCancellationAndChild(Func<TElement, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending, AsyncCachingComparer<TElement> child)
  987. : base(keySelector, comparer, descending)
  988. {
  989. _child = child;
  990. }
  991. internal override async ValueTask<int> Compare(TElement element, bool cacheLower, CancellationToken cancellationToken)
  992. {
  993. TKey newKey = await _keySelector(element, cancellationToken).ConfigureAwait(false);
  994. int cmp = _descending ? _comparer.Compare(_lastKey, newKey) : _comparer.Compare(newKey, _lastKey);
  995. if (cmp == 0)
  996. {
  997. return await _child.Compare(element, cacheLower, cancellationToken).ConfigureAwait(false);
  998. }
  999. if (cacheLower == cmp < 0)
  1000. {
  1001. _lastKey = newKey;
  1002. await _child.SetElement(element, cancellationToken).ConfigureAwait(false);
  1003. }
  1004. return cmp;
  1005. }
  1006. internal override async ValueTask SetElement(TElement element, CancellationToken cancellationToken)
  1007. {
  1008. await base.SetElement(element, cancellationToken).ConfigureAwait(false);
  1009. await _child.SetElement(element, cancellationToken).ConfigureAwait(false);
  1010. }
  1011. }
  1012. #endif
  1013. }