QueryLanguage.Conversions.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive.Linq
  6. {
  7. #if !NO_PERF
  8. using ObservableImpl;
  9. #endif
  10. internal partial class QueryLanguage
  11. {
  12. #region + Subscribe +
  13. public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer)
  14. {
  15. return Subscribe_<TSource>(source, observer, SchedulerDefaults.Iteration);
  16. }
  17. public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
  18. {
  19. return Subscribe_<TSource>(source, observer, scheduler);
  20. }
  21. private static IDisposable Subscribe_<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
  22. {
  23. #if !NO_PERF
  24. //
  25. // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
  26. //
  27. return new ToObservable<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);
  28. #else
  29. var e = source.GetEnumerator();
  30. var flag = new BooleanDisposable();
  31. scheduler.Schedule(self =>
  32. {
  33. var hasNext = false;
  34. var ex = default(Exception);
  35. var current = default(TSource);
  36. if (flag.IsDisposed)
  37. {
  38. e.Dispose();
  39. return;
  40. }
  41. try
  42. {
  43. hasNext = e.MoveNext();
  44. if (hasNext)
  45. current = e.Current;
  46. }
  47. catch (Exception exception)
  48. {
  49. ex = exception;
  50. }
  51. if (!hasNext || ex != null)
  52. {
  53. e.Dispose();
  54. }
  55. if (ex != null)
  56. {
  57. observer.OnError(ex);
  58. return;
  59. }
  60. if (!hasNext)
  61. {
  62. observer.OnCompleted();
  63. return;
  64. }
  65. observer.OnNext(current);
  66. self();
  67. });
  68. return flag;
  69. #endif
  70. }
  71. #endregion
  72. #region + ToEnumerable +
  73. public virtual IEnumerable<TSource> ToEnumerable<TSource>(IObservable<TSource> source)
  74. {
  75. return new AnonymousEnumerable<TSource>(() => source.GetEnumerator());
  76. }
  77. #endregion
  78. #region ToEvent
  79. public virtual IEventSource<Unit> ToEvent(IObservable<Unit> source)
  80. {
  81. return new EventSource<Unit>(source, (h, _) => h(Unit.Default));
  82. }
  83. public virtual IEventSource<TSource> ToEvent<TSource>(IObservable<TSource> source)
  84. {
  85. return new EventSource<TSource>(source, (h, value) => h(value));
  86. }
  87. #endregion
  88. #region ToEventPattern
  89. public virtual IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(IObservable<EventPattern<TEventArgs>> source)
  90. #if !NO_EVENTARGS_CONSTRAINT
  91. where TEventArgs : EventArgs
  92. #endif
  93. {
  94. return new EventPatternSource<TEventArgs>(
  95. #if !NO_VARIANCE
  96. source,
  97. #else
  98. source.Select(x => (EventPattern<object, TEventArgs>)x),
  99. #endif
  100. (h, evt) => h(evt.Sender, evt.EventArgs)
  101. );
  102. }
  103. #endregion
  104. #region + ToObservable +
  105. public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source)
  106. {
  107. #if !NO_PERF
  108. return new ToObservable<TSource>(source, SchedulerDefaults.Iteration);
  109. #else
  110. return ToObservable_(source, SchedulerDefaults.Iteration);
  111. #endif
  112. }
  113. public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
  114. {
  115. #if !NO_PERF
  116. return new ToObservable<TSource>(source, scheduler);
  117. #else
  118. return ToObservable_(source, scheduler);
  119. #endif
  120. }
  121. #if NO_PERF
  122. private static IObservable<TSource> ToObservable_<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
  123. {
  124. return new AnonymousObservable<TSource>(observer => source.Subscribe(observer, scheduler));
  125. }
  126. #endif
  127. #endregion
  128. }
  129. }