Merge.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Reactive.Linq.ObservableImpl
  10. {
  11. class Merge<TSource> : Producer<TSource>
  12. {
  13. private readonly IObservable<IObservable<TSource>> _sources;
  14. private readonly IObservable<Task<TSource>> _sourcesT;
  15. private readonly int _maxConcurrent;
  16. public Merge(IObservable<IObservable<TSource>> sources)
  17. {
  18. _sources = sources;
  19. }
  20. public Merge(IObservable<IObservable<TSource>> sources, int maxConcurrent)
  21. {
  22. _sources = sources;
  23. _maxConcurrent = maxConcurrent;
  24. }
  25. public Merge(IObservable<Task<TSource>> sources)
  26. {
  27. _sourcesT = sources;
  28. }
  29. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  30. {
  31. if (_maxConcurrent > 0)
  32. {
  33. var sink = new MergeConcurrent(this, observer, cancel);
  34. setSink(sink);
  35. return sink.Run();
  36. }
  37. else if (_sourcesT != null)
  38. {
  39. var sink = new MergeImpl(this, observer, cancel);
  40. setSink(sink);
  41. return sink.Run();
  42. }
  43. else
  44. {
  45. var sink = new _(this, observer, cancel);
  46. setSink(sink);
  47. return sink.Run();
  48. }
  49. }
  50. class _ : Sink<TSource>, IObserver<IObservable<TSource>>
  51. {
  52. private readonly Merge<TSource> _parent;
  53. public _(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  54. : base(observer, cancel)
  55. {
  56. _parent = parent;
  57. }
  58. private object _gate;
  59. private bool _isStopped;
  60. private CompositeDisposable _group;
  61. private SingleAssignmentDisposable _sourceSubscription;
  62. public IDisposable Run()
  63. {
  64. _gate = new object();
  65. _isStopped = false;
  66. _group = new CompositeDisposable();
  67. _sourceSubscription = new SingleAssignmentDisposable();
  68. _group.Add(_sourceSubscription);
  69. _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this);
  70. return _group;
  71. }
  72. public void OnNext(IObservable<TSource> value)
  73. {
  74. var innerSubscription = new SingleAssignmentDisposable();
  75. _group.Add(innerSubscription);
  76. innerSubscription.Disposable = value.SubscribeSafe(new Iter(this, innerSubscription));
  77. }
  78. public void OnError(Exception error)
  79. {
  80. lock (_gate)
  81. {
  82. base._observer.OnError(error);
  83. base.Dispose();
  84. }
  85. }
  86. public void OnCompleted()
  87. {
  88. _isStopped = true;
  89. if (_group.Count == 1)
  90. {
  91. //
  92. // Notice there can be a race between OnCompleted of the source and any
  93. // of the inner sequences, where both see _group.Count == 1, and one is
  94. // waiting for the lock. There won't be a double OnCompleted observation
  95. // though, because the call to Dispose silences the observer by swapping
  96. // in a NopObserver<T>.
  97. //
  98. lock (_gate)
  99. {
  100. base._observer.OnCompleted();
  101. base.Dispose();
  102. }
  103. }
  104. else
  105. {
  106. _sourceSubscription.Dispose();
  107. }
  108. }
  109. class Iter : IObserver<TSource>
  110. {
  111. private readonly _ _parent;
  112. private readonly IDisposable _self;
  113. public Iter(_ parent, IDisposable self)
  114. {
  115. _parent = parent;
  116. _self = self;
  117. }
  118. public void OnNext(TSource value)
  119. {
  120. lock (_parent._gate)
  121. _parent._observer.OnNext(value);
  122. }
  123. public void OnError(Exception error)
  124. {
  125. lock (_parent._gate)
  126. {
  127. _parent._observer.OnError(error);
  128. _parent.Dispose();
  129. }
  130. }
  131. public void OnCompleted()
  132. {
  133. _parent._group.Remove(_self);
  134. if (_parent._isStopped && _parent._group.Count == 1)
  135. {
  136. //
  137. // Notice there can be a race between OnCompleted of the source and any
  138. // of the inner sequences, where both see _group.Count == 1, and one is
  139. // waiting for the lock. There won't be a double OnCompleted observation
  140. // though, because the call to Dispose silences the observer by swapping
  141. // in a NopObserver<T>.
  142. //
  143. lock (_parent._gate)
  144. {
  145. _parent._observer.OnCompleted();
  146. _parent.Dispose();
  147. }
  148. }
  149. }
  150. }
  151. }
  152. class MergeConcurrent : Sink<TSource>, IObserver<IObservable<TSource>>
  153. {
  154. private readonly Merge<TSource> _parent;
  155. public MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  156. : base(observer, cancel)
  157. {
  158. _parent = parent;
  159. }
  160. private object _gate;
  161. private Queue<IObservable<TSource>> _q;
  162. private bool _isStopped;
  163. private SingleAssignmentDisposable _sourceSubscription;
  164. private CompositeDisposable _group;
  165. private int _activeCount = 0;
  166. public IDisposable Run()
  167. {
  168. _gate = new object();
  169. _q = new Queue<IObservable<TSource>>();
  170. _isStopped = false;
  171. _activeCount = 0;
  172. _group = new CompositeDisposable();
  173. _sourceSubscription = new SingleAssignmentDisposable();
  174. _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this);
  175. _group.Add(_sourceSubscription);
  176. return _group;
  177. }
  178. public void OnNext(IObservable<TSource> value)
  179. {
  180. lock (_gate)
  181. {
  182. if (_activeCount < _parent._maxConcurrent)
  183. {
  184. _activeCount++;
  185. Subscribe(value);
  186. }
  187. else
  188. _q.Enqueue(value);
  189. }
  190. }
  191. public void OnError(Exception error)
  192. {
  193. lock (_gate)
  194. {
  195. base._observer.OnError(error);
  196. base.Dispose();
  197. }
  198. }
  199. public void OnCompleted()
  200. {
  201. lock (_gate)
  202. {
  203. _isStopped = true;
  204. if (_activeCount == 0)
  205. {
  206. base._observer.OnCompleted();
  207. base.Dispose();
  208. }
  209. else
  210. {
  211. _sourceSubscription.Dispose();
  212. }
  213. }
  214. }
  215. private void Subscribe(IObservable<TSource> innerSource)
  216. {
  217. var subscription = new SingleAssignmentDisposable();
  218. _group.Add(subscription);
  219. subscription.Disposable = innerSource.SubscribeSafe(new Iter(this, subscription));
  220. }
  221. class Iter : IObserver<TSource>
  222. {
  223. private readonly MergeConcurrent _parent;
  224. private readonly IDisposable _self;
  225. public Iter(MergeConcurrent parent, IDisposable self)
  226. {
  227. _parent = parent;
  228. _self = self;
  229. }
  230. public void OnNext(TSource value)
  231. {
  232. lock (_parent._gate)
  233. _parent._observer.OnNext(value);
  234. }
  235. public void OnError(Exception error)
  236. {
  237. lock (_parent._gate)
  238. {
  239. _parent._observer.OnError(error);
  240. _parent.Dispose();
  241. }
  242. }
  243. public void OnCompleted()
  244. {
  245. _parent._group.Remove(_self);
  246. lock (_parent._gate)
  247. {
  248. if (_parent._q.Count > 0)
  249. {
  250. var s = _parent._q.Dequeue();
  251. _parent.Subscribe(s);
  252. }
  253. else
  254. {
  255. _parent._activeCount--;
  256. if (_parent._isStopped && _parent._activeCount == 0)
  257. {
  258. _parent._observer.OnCompleted();
  259. _parent.Dispose();
  260. }
  261. }
  262. }
  263. }
  264. }
  265. }
  266. class MergeImpl : Sink<TSource>, IObserver<Task<TSource>>
  267. {
  268. private readonly Merge<TSource> _parent;
  269. public MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  270. : base(observer, cancel)
  271. {
  272. _parent = parent;
  273. }
  274. private object _gate;
  275. private volatile int _count;
  276. public IDisposable Run()
  277. {
  278. _gate = new object();
  279. _count = 1;
  280. return _parent._sourcesT.SubscribeSafe(this);
  281. }
  282. public void OnNext(Task<TSource> value)
  283. {
  284. Interlocked.Increment(ref _count);
  285. if (value.IsCompleted)
  286. {
  287. OnCompletedTask(value);
  288. }
  289. else
  290. {
  291. value.ContinueWith(OnCompletedTask);
  292. }
  293. }
  294. private void OnCompletedTask(Task<TSource> task)
  295. {
  296. switch (task.Status)
  297. {
  298. case TaskStatus.RanToCompletion:
  299. {
  300. lock (_gate)
  301. base._observer.OnNext(task.Result);
  302. OnCompleted();
  303. }
  304. break;
  305. case TaskStatus.Faulted:
  306. {
  307. lock (_gate)
  308. {
  309. base._observer.OnError(task.Exception.InnerException);
  310. base.Dispose();
  311. }
  312. }
  313. break;
  314. case TaskStatus.Canceled:
  315. {
  316. lock (_gate)
  317. {
  318. base._observer.OnError(new TaskCanceledException(task));
  319. base.Dispose();
  320. }
  321. }
  322. break;
  323. }
  324. }
  325. public void OnError(Exception error)
  326. {
  327. lock (_gate)
  328. {
  329. base._observer.OnError(error);
  330. base.Dispose();
  331. }
  332. }
  333. public void OnCompleted()
  334. {
  335. if (Interlocked.Decrement(ref _count) == 0)
  336. {
  337. lock (_gate)
  338. {
  339. base._observer.OnCompleted();
  340. base.Dispose();
  341. }
  342. }
  343. }
  344. }
  345. }
  346. }