GroupByUntil.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. namespace System.Reactive.Linq.Observαble
  8. {
  9. class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>>
  10. {
  11. private readonly IObservable<TSource> _source;
  12. private readonly Func<TSource, TKey> _keySelector;
  13. private readonly Func<TSource, TElement> _elementSelector;
  14. private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector;
  15. private readonly int? _capacity;
  16. private readonly IEqualityComparer<TKey> _comparer;
  17. public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
  18. {
  19. _source = source;
  20. _keySelector = keySelector;
  21. _elementSelector = elementSelector;
  22. _durationSelector = durationSelector;
  23. _capacity = capacity;
  24. _comparer = comparer;
  25. }
  26. private CompositeDisposable _groupDisposable;
  27. private RefCountDisposable _refCountDisposable;
  28. protected override IDisposable Run(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel, Action<IDisposable> setSink)
  29. {
  30. _groupDisposable = new CompositeDisposable();
  31. _refCountDisposable = new RefCountDisposable(_groupDisposable);
  32. var sink = new _(this, observer, cancel);
  33. setSink(sink);
  34. _groupDisposable.Add(_source.SubscribeSafe(sink));
  35. return _refCountDisposable;
  36. }
  37. class _ : Sink<IGroupedObservable<TKey, TElement>>, IObserver<TSource>
  38. {
  39. private readonly GroupByUntil<TSource, TKey, TElement, TDuration> _parent;
  40. private readonly Map<TKey, ISubject<TElement>> _map;
  41. private ISubject<TElement> _null;
  42. private object _nullGate;
  43. public _(GroupByUntil<TSource, TKey, TElement, TDuration> parent, IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
  44. : base(observer, cancel)
  45. {
  46. _parent = parent;
  47. _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer);
  48. _nullGate = new object();
  49. }
  50. public void OnNext(TSource value)
  51. {
  52. var key = default(TKey);
  53. try
  54. {
  55. key = _parent._keySelector(value);
  56. }
  57. catch (Exception exception)
  58. {
  59. Error(exception);
  60. return;
  61. }
  62. var fireNewMapEntry = false;
  63. var writer = default(ISubject<TElement>);
  64. try
  65. {
  66. //
  67. // Note: The box instruction in the IL will be erased by the JIT in case T is
  68. // a value type. In fact, the whole if block will go away and we'll end
  69. // up with nothing but the GetOrAdd call below.
  70. //
  71. // See GroupBy for more information and confirmation of this fact using
  72. // the SOS debugger extension.
  73. //
  74. if (key == null)
  75. {
  76. lock (_nullGate)
  77. {
  78. if (_null == null)
  79. {
  80. _null = new Subject<TElement>();
  81. fireNewMapEntry = true;
  82. }
  83. writer = _null;
  84. }
  85. }
  86. else
  87. {
  88. writer = _map.GetOrAdd(key, () => new Subject<TElement>(), out fireNewMapEntry);
  89. }
  90. }
  91. catch (Exception exception)
  92. {
  93. Error(exception);
  94. return;
  95. }
  96. if (fireNewMapEntry)
  97. {
  98. var group = new GroupedObservable<TKey, TElement>(key, writer, _parent._refCountDisposable);
  99. var duration = default(IObservable<TDuration>);
  100. var durationGroup = new GroupedObservable<TKey, TElement>(key, writer);
  101. try
  102. {
  103. duration = _parent._durationSelector(durationGroup);
  104. }
  105. catch (Exception exception)
  106. {
  107. Error(exception);
  108. return;
  109. }
  110. lock (base._observer)
  111. base._observer.OnNext(group);
  112. var md = new SingleAssignmentDisposable();
  113. _parent._groupDisposable.Add(md);
  114. md.Disposable = duration.SubscribeSafe(new δ(this, key, writer, md));
  115. }
  116. var element = default(TElement);
  117. try
  118. {
  119. element = _parent._elementSelector(value);
  120. }
  121. catch (Exception exception)
  122. {
  123. Error(exception);
  124. return;
  125. }
  126. //
  127. // ISSUE: Rx v1.x shipped without proper handling of the case where the duration
  128. // sequence fires concurrently with the OnNext code path here. In such a
  129. // case, the subject can be completed before we get a chance to send out
  130. // a new element. However, a resurrected group for the same key won't get
  131. // to see the element either. To guard against this case, we'd have to
  132. // check whether the OnNext call below lost the race, and resurrect a new
  133. // group if needed. Unfortunately, this complicates matters when the
  134. // duration selector triggers synchronously (e.g. Return or Empty), which
  135. // causes the group to terminate immediately. We should not get stuck in
  136. // this case, repeatedly trying to resurrect a group that always ends
  137. // before we can send the element into it. Also, users may expect this
  138. // base case to mean no elements will ever be produced, so sending the
  139. // element into the group before starting the duration sequence may not
  140. // be a good idea either. For the time being, we'll leave this as-is and
  141. // revisit the behavior for vNext. Nonetheless, we'll add synchronization
  142. // to ensure no concurrent calls to the subject are made.
  143. //
  144. lock (writer)
  145. writer.OnNext(element);
  146. }
  147. class δ : IObserver<TDuration>
  148. {
  149. private readonly _ _parent;
  150. private readonly TKey _key;
  151. private readonly ISubject<TElement> _writer;
  152. private readonly IDisposable _self;
  153. public δ(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
  154. {
  155. _parent = parent;
  156. _key = key;
  157. _writer = writer;
  158. _self = self;
  159. }
  160. public void OnNext(TDuration value)
  161. {
  162. OnCompleted();
  163. }
  164. public void OnError(Exception error)
  165. {
  166. _parent.Error(error);
  167. _self.Dispose();
  168. }
  169. public void OnCompleted()
  170. {
  171. if (_key == null)
  172. {
  173. var @null = default(ISubject<TElement>);
  174. lock (_parent._nullGate)
  175. {
  176. @null = _parent._null;
  177. _parent._null = null;
  178. }
  179. lock (@null)
  180. @null.OnCompleted();
  181. }
  182. else
  183. {
  184. if (_parent._map.Remove(_key))
  185. {
  186. lock (_writer)
  187. _writer.OnCompleted();
  188. }
  189. }
  190. _parent._parent._groupDisposable.Remove(_self);
  191. }
  192. }
  193. public void OnError(Exception error)
  194. {
  195. Error(error);
  196. }
  197. public void OnCompleted()
  198. {
  199. //
  200. // NOTE: A race with OnCompleted triggered by a duration selector is fine when
  201. // using Subject<T>. It will transition into a terminal state, making one
  202. // of the two calls a no-op by swapping in a DoneObserver<T>.
  203. //
  204. var @null = default(ISubject<TElement>);
  205. lock (_nullGate)
  206. @null = _null;
  207. if (@null != null)
  208. @null.OnCompleted();
  209. foreach (var w in _map.Values)
  210. w.OnCompleted();
  211. lock (base._observer)
  212. base._observer.OnCompleted();
  213. base.Dispose();
  214. }
  215. private void Error(Exception exception)
  216. {
  217. //
  218. // NOTE: A race with OnCompleted triggered by a duration selector is fine when
  219. // using Subject<T>. It will transition into a terminal state, making one
  220. // of the two calls a no-op by swapping in a DoneObserver<T>.
  221. //
  222. var @null = default(ISubject<TElement>);
  223. lock (_nullGate)
  224. @null = _null;
  225. if (@null != null)
  226. @null.OnError(exception);
  227. foreach (var w in _map.Values)
  228. w.OnError(exception);
  229. lock (base._observer)
  230. base._observer.OnError(exception);
  231. base.Dispose();
  232. }
  233. }
  234. }
  235. #if !NO_CDS
  236. class Map<TKey, TValue>
  237. {
  238. #if !NO_CDS_COLLECTIONS
  239. // Taken from Rx\NET\Source\System.Reactive.Core\Reactive\Internal\ConcurrentDictionary.cs
  240. // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the
  241. // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference
  242. // and blocking, but also the more expensive operations that require all locks become (e.g. table
  243. // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good
  244. // compromise.
  245. private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4;
  246. private static int DefaultConcurrencyLevel
  247. {
  248. get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; }
  249. }
  250. #endif
  251. private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map;
  252. public Map(int? capacity, IEqualityComparer<TKey> comparer)
  253. {
  254. if (capacity.HasValue)
  255. {
  256. #if NO_CDS_COLLECTIONS
  257. _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(capacity.Value, comparer);
  258. #else
  259. _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer);
  260. #endif
  261. }
  262. else
  263. {
  264. _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
  265. }
  266. }
  267. public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
  268. {
  269. added = false;
  270. var value = default(TValue);
  271. var newValue = default(TValue);
  272. var hasNewValue = false;
  273. while (true)
  274. {
  275. if (_map.TryGetValue(key, out value))
  276. break;
  277. if (!hasNewValue)
  278. {
  279. newValue = valueFactory();
  280. hasNewValue = true;
  281. }
  282. if (_map.TryAdd(key, newValue))
  283. {
  284. added = true;
  285. value = newValue;
  286. break;
  287. }
  288. }
  289. return value;
  290. }
  291. public IEnumerable<TValue> Values
  292. {
  293. get
  294. {
  295. return _map.Values.ToArray();
  296. }
  297. }
  298. public bool Remove(TKey key)
  299. {
  300. var value = default(TValue);
  301. return _map.TryRemove(key, out value);
  302. }
  303. }
  304. #else
  305. class Map<TKey, TValue>
  306. {
  307. private readonly Dictionary<TKey, TValue> _map;
  308. public Map(int? capacity, IEqualityComparer<TKey> comparer)
  309. {
  310. if (capacity.HasValue)
  311. {
  312. _map = new Dictionary<TKey, TValue>(capacity.Value, comparer);
  313. }
  314. else
  315. {
  316. _map = new Dictionary<TKey, TValue>(comparer);
  317. }
  318. }
  319. public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
  320. {
  321. lock (_map)
  322. {
  323. added = false;
  324. var value = default(TValue);
  325. if (!_map.TryGetValue(key, out value))
  326. {
  327. value = valueFactory();
  328. _map.Add(key, value);
  329. added = true;
  330. }
  331. return value;
  332. }
  333. }
  334. public IEnumerable<TValue> Values
  335. {
  336. get
  337. {
  338. lock (_map)
  339. {
  340. return _map.Values.ToArray();
  341. }
  342. }
  343. }
  344. public bool Remove(TKey key)
  345. {
  346. lock (_map)
  347. {
  348. return _map.Remove(key);
  349. }
  350. }
  351. }
  352. #endif
  353. }
  354. #endif