GroupedObservable.cs 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Disposables;
  3. using System.Reactive.Subjects;
  4. namespace System.Reactive.Linq
  5. {
  6. class GroupedObservable<TKey, TElement> : ObservableBase<TElement>, IGroupedObservable<TKey, TElement>
  7. {
  8. private readonly TKey _key;
  9. private readonly IObservable<TElement> _subject;
  10. private readonly RefCountDisposable _refCount;
  11. public GroupedObservable(TKey key, ISubject<TElement> subject, RefCountDisposable refCount)
  12. {
  13. _key = key;
  14. _subject = subject;
  15. _refCount = refCount;
  16. }
  17. public GroupedObservable(TKey key, ISubject<TElement> subject)
  18. {
  19. _key = key;
  20. _subject = subject;
  21. }
  22. public TKey Key
  23. {
  24. get { return _key; }
  25. }
  26. protected override IDisposable SubscribeCore(IObserver<TElement> observer)
  27. {
  28. if (_refCount != null)
  29. {
  30. //
  31. // [OK] Use of unsafe Subscribe: called on a known subject implementation.
  32. //
  33. var release = _refCount.GetDisposable();
  34. var subscription = _subject.Subscribe/*Unsafe*/(observer);
  35. return StableCompositeDisposable.Create(release, subscription);
  36. }
  37. else
  38. {
  39. //
  40. // [OK] Use of unsafe Subscribe: called on a known subject implementation.
  41. //
  42. return _subject.Subscribe/*Unsafe*/(observer);
  43. }
  44. }
  45. }
  46. }