Merge.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Reactive.Disposables;
  8. #if !NO_TPL
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. #endif
  12. namespace System.Reactive.Linq.ObservableImpl
  13. {
  14. class Merge<TSource> : Producer<TSource>
  15. {
  16. private readonly IObservable<IObservable<TSource>> _sources;
  17. private readonly int _maxConcurrent;
  18. public Merge(IObservable<IObservable<TSource>> sources)
  19. {
  20. _sources = sources;
  21. }
  22. public Merge(IObservable<IObservable<TSource>> sources, int maxConcurrent)
  23. {
  24. _sources = sources;
  25. _maxConcurrent = maxConcurrent;
  26. }
  27. #if !NO_TPL
  28. private readonly IObservable<Task<TSource>> _sourcesT;
  29. public Merge(IObservable<Task<TSource>> sources)
  30. {
  31. _sourcesT = sources;
  32. }
  33. #endif
  34. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  35. {
  36. if (_maxConcurrent > 0)
  37. {
  38. var sink = new MergeConcurrent(this, observer, cancel);
  39. setSink(sink);
  40. return sink.Run();
  41. }
  42. #if !NO_TPL
  43. else if (_sourcesT != null)
  44. {
  45. var sink = new MergeImpl(this, observer, cancel);
  46. setSink(sink);
  47. return sink.Run();
  48. }
  49. #endif
  50. else
  51. {
  52. var sink = new _(this, observer, cancel);
  53. setSink(sink);
  54. return sink.Run();
  55. }
  56. }
  57. class _ : Sink<TSource>, IObserver<IObservable<TSource>>
  58. {
  59. private readonly Merge<TSource> _parent;
  60. public _(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  61. : base(observer, cancel)
  62. {
  63. _parent = parent;
  64. }
  65. private object _gate;
  66. private bool _isStopped;
  67. private CompositeDisposable _group;
  68. private SingleAssignmentDisposable _sourceSubscription;
  69. public IDisposable Run()
  70. {
  71. _gate = new object();
  72. _isStopped = false;
  73. _group = new CompositeDisposable();
  74. _sourceSubscription = new SingleAssignmentDisposable();
  75. _group.Add(_sourceSubscription);
  76. _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this);
  77. return _group;
  78. }
  79. public void OnNext(IObservable<TSource> value)
  80. {
  81. var innerSubscription = new SingleAssignmentDisposable();
  82. _group.Add(innerSubscription);
  83. innerSubscription.Disposable = value.SubscribeSafe(new Iter(this, innerSubscription));
  84. }
  85. public void OnError(Exception error)
  86. {
  87. lock (_gate)
  88. {
  89. base._observer.OnError(error);
  90. base.Dispose();
  91. }
  92. }
  93. public void OnCompleted()
  94. {
  95. _isStopped = true;
  96. if (_group.Count == 1)
  97. {
  98. //
  99. // Notice there can be a race between OnCompleted of the source and any
  100. // of the inner sequences, where both see _group.Count == 1, and one is
  101. // waiting for the lock. There won't be a double OnCompleted observation
  102. // though, because the call to Dispose silences the observer by swapping
  103. // in a NopObserver<T>.
  104. //
  105. lock (_gate)
  106. {
  107. base._observer.OnCompleted();
  108. base.Dispose();
  109. }
  110. }
  111. else
  112. {
  113. _sourceSubscription.Dispose();
  114. }
  115. }
  116. class Iter : IObserver<TSource>
  117. {
  118. private readonly _ _parent;
  119. private readonly IDisposable _self;
  120. public Iter(_ parent, IDisposable self)
  121. {
  122. _parent = parent;
  123. _self = self;
  124. }
  125. public void OnNext(TSource value)
  126. {
  127. lock (_parent._gate)
  128. _parent._observer.OnNext(value);
  129. }
  130. public void OnError(Exception error)
  131. {
  132. lock (_parent._gate)
  133. {
  134. _parent._observer.OnError(error);
  135. _parent.Dispose();
  136. }
  137. }
  138. public void OnCompleted()
  139. {
  140. _parent._group.Remove(_self);
  141. if (_parent._isStopped && _parent._group.Count == 1)
  142. {
  143. //
  144. // Notice there can be a race between OnCompleted of the source and any
  145. // of the inner sequences, where both see _group.Count == 1, and one is
  146. // waiting for the lock. There won't be a double OnCompleted observation
  147. // though, because the call to Dispose silences the observer by swapping
  148. // in a NopObserver<T>.
  149. //
  150. lock (_parent._gate)
  151. {
  152. _parent._observer.OnCompleted();
  153. _parent.Dispose();
  154. }
  155. }
  156. }
  157. }
  158. }
  159. class MergeConcurrent : Sink<TSource>, IObserver<IObservable<TSource>>
  160. {
  161. private readonly Merge<TSource> _parent;
  162. public MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  163. : base(observer, cancel)
  164. {
  165. _parent = parent;
  166. }
  167. private object _gate;
  168. private Queue<IObservable<TSource>> _q;
  169. private bool _isStopped;
  170. private SingleAssignmentDisposable _sourceSubscription;
  171. private CompositeDisposable _group;
  172. private int _activeCount = 0;
  173. public IDisposable Run()
  174. {
  175. _gate = new object();
  176. _q = new Queue<IObservable<TSource>>();
  177. _isStopped = false;
  178. _activeCount = 0;
  179. _group = new CompositeDisposable();
  180. _sourceSubscription = new SingleAssignmentDisposable();
  181. _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this);
  182. _group.Add(_sourceSubscription);
  183. return _group;
  184. }
  185. public void OnNext(IObservable<TSource> value)
  186. {
  187. lock (_gate)
  188. {
  189. if (_activeCount < _parent._maxConcurrent)
  190. {
  191. _activeCount++;
  192. Subscribe(value);
  193. }
  194. else
  195. _q.Enqueue(value);
  196. }
  197. }
  198. public void OnError(Exception error)
  199. {
  200. lock (_gate)
  201. {
  202. base._observer.OnError(error);
  203. base.Dispose();
  204. }
  205. }
  206. public void OnCompleted()
  207. {
  208. lock (_gate)
  209. {
  210. _isStopped = true;
  211. if (_activeCount == 0)
  212. {
  213. base._observer.OnCompleted();
  214. base.Dispose();
  215. }
  216. else
  217. {
  218. _sourceSubscription.Dispose();
  219. }
  220. }
  221. }
  222. private void Subscribe(IObservable<TSource> innerSource)
  223. {
  224. var subscription = new SingleAssignmentDisposable();
  225. _group.Add(subscription);
  226. subscription.Disposable = innerSource.SubscribeSafe(new Iter(this, subscription));
  227. }
  228. class Iter : IObserver<TSource>
  229. {
  230. private readonly MergeConcurrent _parent;
  231. private readonly IDisposable _self;
  232. public Iter(MergeConcurrent parent, IDisposable self)
  233. {
  234. _parent = parent;
  235. _self = self;
  236. }
  237. public void OnNext(TSource value)
  238. {
  239. lock (_parent._gate)
  240. _parent._observer.OnNext(value);
  241. }
  242. public void OnError(Exception error)
  243. {
  244. lock (_parent._gate)
  245. {
  246. _parent._observer.OnError(error);
  247. _parent.Dispose();
  248. }
  249. }
  250. public void OnCompleted()
  251. {
  252. _parent._group.Remove(_self);
  253. lock (_parent._gate)
  254. {
  255. if (_parent._q.Count > 0)
  256. {
  257. var s = _parent._q.Dequeue();
  258. _parent.Subscribe(s);
  259. }
  260. else
  261. {
  262. _parent._activeCount--;
  263. if (_parent._isStopped && _parent._activeCount == 0)
  264. {
  265. _parent._observer.OnCompleted();
  266. _parent.Dispose();
  267. }
  268. }
  269. }
  270. }
  271. }
  272. }
  273. #if !NO_TPL
  274. #pragma warning disable 0420
  275. class MergeImpl : Sink<TSource>, IObserver<Task<TSource>>
  276. {
  277. private readonly Merge<TSource> _parent;
  278. public MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  279. : base(observer, cancel)
  280. {
  281. _parent = parent;
  282. }
  283. private object _gate;
  284. private volatile int _count;
  285. public IDisposable Run()
  286. {
  287. _gate = new object();
  288. _count = 1;
  289. return _parent._sourcesT.SubscribeSafe(this);
  290. }
  291. public void OnNext(Task<TSource> value)
  292. {
  293. Interlocked.Increment(ref _count);
  294. if (value.IsCompleted)
  295. {
  296. OnCompletedTask(value);
  297. }
  298. else
  299. {
  300. value.ContinueWith(OnCompletedTask);
  301. }
  302. }
  303. private void OnCompletedTask(Task<TSource> task)
  304. {
  305. switch (task.Status)
  306. {
  307. case TaskStatus.RanToCompletion:
  308. {
  309. lock (_gate)
  310. base._observer.OnNext(task.Result);
  311. OnCompleted();
  312. }
  313. break;
  314. case TaskStatus.Faulted:
  315. {
  316. lock (_gate)
  317. {
  318. base._observer.OnError(task.Exception.InnerException);
  319. base.Dispose();
  320. }
  321. }
  322. break;
  323. case TaskStatus.Canceled:
  324. {
  325. lock (_gate)
  326. {
  327. base._observer.OnError(new TaskCanceledException(task));
  328. base.Dispose();
  329. }
  330. }
  331. break;
  332. }
  333. }
  334. public void OnError(Exception error)
  335. {
  336. lock (_gate)
  337. {
  338. base._observer.OnError(error);
  339. base.Dispose();
  340. }
  341. }
  342. public void OnCompleted()
  343. {
  344. if (Interlocked.Decrement(ref _count) == 0)
  345. {
  346. lock (_gate)
  347. {
  348. base._observer.OnCompleted();
  349. base.Dispose();
  350. }
  351. }
  352. }
  353. }
  354. #pragma warning restore 0420
  355. #endif
  356. }
  357. }
  358. #endif