AsyncInfoToObservableBridge.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if HAS_WINRT
  3. using System.Reactive.Disposables;
  4. using System.Reactive.Subjects;
  5. using Windows.Foundation;
  6. namespace System.Reactive.Windows.Foundation
  7. {
  8. class AsyncInfoToObservableBridge<TResult, TProgress> : ObservableBase<TResult>
  9. {
  10. private readonly Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> _onCompleted;
  11. private readonly Func<IAsyncInfo, TResult> _getResult;
  12. private readonly AsyncSubject<TResult> _subject;
  13. public AsyncInfoToObservableBridge(IAsyncInfo info, Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> onCompleted, Func<IAsyncInfo, TResult> getResult, Action<IAsyncInfo, Action<IAsyncInfo, TProgress>> onProgress, IProgress<TProgress> progress, bool multiValue)
  14. {
  15. _onCompleted = onCompleted;
  16. _getResult = getResult;
  17. _subject = new AsyncSubject<TResult>();
  18. if (onProgress != null)
  19. {
  20. onProgress(info, (iai, p) =>
  21. {
  22. if (multiValue && getResult != null)
  23. _subject.OnNext(getResult(iai));
  24. if (progress != null)
  25. progress.Report(p);
  26. });
  27. }
  28. Done(info, info.Status, true);
  29. }
  30. private void Done(IAsyncInfo info, AsyncStatus status, bool initial)
  31. {
  32. var error = default(Exception);
  33. var result = default(TResult);
  34. //
  35. // Initial interactions with the IAsyncInfo object. Those could fail, which indicates
  36. // a rogue implementation. Failure is just propagated out.
  37. //
  38. switch (status)
  39. {
  40. case AsyncStatus.Error:
  41. error = info.ErrorCode;
  42. if (error == null)
  43. throw new InvalidOperationException("The asynchronous operation failed with a null error code.");
  44. break;
  45. case AsyncStatus.Canceled:
  46. error = new OperationCanceledException();
  47. break;
  48. case AsyncStatus.Completed:
  49. if (_getResult != null)
  50. result = _getResult(info);
  51. break;
  52. default:
  53. if (!initial)
  54. throw new InvalidOperationException("The asynchronous operation completed unexpectedly.");
  55. _onCompleted(info, (iai, s) => Done(iai, s, false));
  56. return;
  57. }
  58. //
  59. // Close as early as possible, before running continuations which could fail. In case of
  60. // failure above, we don't close out the object in order to allow for debugging of the
  61. // rogue implementation without losing state prematurely. Notice _getResults is merely
  62. // an indirect call to the appropriate GetResults method, which is not supposed to throw.
  63. // Instead, an Error status should be returned.
  64. //
  65. info.Close();
  66. //
  67. // Now we run the continuations, which could take a long time. Failure here is catastrophic
  68. // and under control of the upstream subscriber.
  69. //
  70. if (error != null)
  71. {
  72. _subject.OnError(error);
  73. }
  74. else
  75. {
  76. if (_getResult != null)
  77. _subject.OnNext(result);
  78. _subject.OnCompleted();
  79. }
  80. }
  81. protected override IDisposable SubscribeCore(IObserver<TResult> observer)
  82. {
  83. return _subject.Subscribe(observer);
  84. }
  85. }
  86. }
  87. #endif