ToDictionary.cs 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Linq.Observαble
  6. {
  7. class ToDictionary<TSource, TKey, TElement> : Producer<IDictionary<TKey, TElement>>
  8. {
  9. private readonly IObservable<TSource> _source;
  10. private readonly Func<TSource, TKey> _keySelector;
  11. private readonly Func<TSource, TElement> _elementSelector;
  12. private readonly IEqualityComparer<TKey> _comparer;
  13. public ToDictionary(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  14. {
  15. _source = source;
  16. _keySelector = keySelector;
  17. _elementSelector = elementSelector;
  18. _comparer = comparer;
  19. }
  20. protected override IDisposable Run(IObserver<IDictionary<TKey, TElement>> observer, IDisposable cancel, Action<IDisposable> setSink)
  21. {
  22. var sink = new _(this, observer, cancel);
  23. setSink(sink);
  24. return _source.SubscribeSafe(sink);
  25. }
  26. class _ : Sink<IDictionary<TKey, TElement>>, IObserver<TSource>
  27. {
  28. private readonly ToDictionary<TSource, TKey, TElement> _parent;
  29. private Dictionary<TKey, TElement> _dictionary;
  30. public _(ToDictionary<TSource, TKey, TElement> parent, IObserver<IDictionary<TKey, TElement>> observer, IDisposable cancel)
  31. : base(observer, cancel)
  32. {
  33. _parent = parent;
  34. _dictionary = new Dictionary<TKey, TElement>(_parent._comparer);
  35. }
  36. public void OnNext(TSource value)
  37. {
  38. try
  39. {
  40. _dictionary.Add(_parent._keySelector(value), _parent._elementSelector(value));
  41. }
  42. catch (Exception ex)
  43. {
  44. base._observer.OnError(ex);
  45. base.Dispose();
  46. }
  47. }
  48. public void OnError(Exception error)
  49. {
  50. base._observer.OnError(error);
  51. base.Dispose();
  52. }
  53. public void OnCompleted()
  54. {
  55. base._observer.OnNext(_dictionary);
  56. base._observer.OnCompleted();
  57. base.Dispose();
  58. }
  59. }
  60. }
  61. }
  62. #endif