1
0

QueryLanguage.Conversions.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. namespace System.Reactive.Linq
  7. {
  8. using ObservableImpl;
  9. internal partial class QueryLanguage
  10. {
  11. #region + Subscribe +
  12. public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer)
  13. {
  14. return Subscribe_(source, observer, SchedulerDefaults.Iteration);
  15. }
  16. public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
  17. {
  18. return Subscribe_(source, observer, scheduler);
  19. }
  20. private static IDisposable Subscribe_<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
  21. {
  22. var longRunning = scheduler.AsLongRunning();
  23. if (longRunning != null)
  24. {
  25. //
  26. // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
  27. //
  28. return new ToObservableLongRunning<TSource>(source, longRunning).Subscribe/*Unsafe*/(observer);
  29. }
  30. //
  31. // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
  32. //
  33. return new ToObservableRecursive<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);
  34. }
  35. #endregion
  36. #region + ToEnumerable +
  37. public virtual IEnumerable<TSource> ToEnumerable<TSource>(IObservable<TSource> source)
  38. {
  39. return new AnonymousEnumerable<TSource>(() => source.GetEnumerator());
  40. }
  41. #endregion
  42. #region ToEvent
  43. public virtual IEventSource<Unit> ToEvent(IObservable<Unit> source)
  44. {
  45. return new EventSource<Unit>(source, (h, _) => h(Unit.Default));
  46. }
  47. public virtual IEventSource<TSource> ToEvent<TSource>(IObservable<TSource> source)
  48. {
  49. return new EventSource<TSource>(source, (h, value) => h(value));
  50. }
  51. #endregion
  52. #region ToEventPattern
  53. public virtual IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(IObservable<EventPattern<TEventArgs>> source)
  54. {
  55. return new EventPatternSource<TEventArgs>(
  56. source,
  57. (h, evt) => h(evt.Sender, evt.EventArgs)
  58. );
  59. }
  60. #endregion
  61. #region + ToObservable +
  62. public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source)
  63. {
  64. return ToObservable_(source, SchedulerDefaults.Iteration);
  65. }
  66. public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
  67. {
  68. return ToObservable_(source, scheduler);
  69. }
  70. private static IObservable<TSource> ToObservable_<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
  71. {
  72. var longRunning = scheduler.AsLongRunning();
  73. if (longRunning != null)
  74. {
  75. //
  76. // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
  77. //
  78. return new ToObservableLongRunning<TSource>(source, longRunning);
  79. }
  80. //
  81. // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
  82. //
  83. return new ToObservableRecursive<TSource>(source, scheduler);
  84. }
  85. #endregion
  86. }
  87. }