AsyncInfoToObservableBridge.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if HAS_WINRT
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. using Windows.Foundation;
  8. namespace System.Reactive.Windows.Foundation
  9. {
  10. class AsyncInfoToObservableBridge<TResult, TProgress> : ObservableBase<TResult>
  11. {
  12. private readonly Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> _onCompleted;
  13. private readonly Func<IAsyncInfo, TResult> _getResult;
  14. private readonly AsyncSubject<TResult> _subject;
  15. public AsyncInfoToObservableBridge(IAsyncInfo info, Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> onCompleted, Func<IAsyncInfo, TResult> getResult, Action<IAsyncInfo, Action<IAsyncInfo, TProgress>> onProgress, IProgress<TProgress> progress, bool multiValue)
  16. {
  17. _onCompleted = onCompleted;
  18. _getResult = getResult;
  19. _subject = new AsyncSubject<TResult>();
  20. if (onProgress != null)
  21. {
  22. onProgress(info, (iai, p) =>
  23. {
  24. if (multiValue && getResult != null)
  25. _subject.OnNext(getResult(iai));
  26. if (progress != null)
  27. progress.Report(p);
  28. });
  29. }
  30. Done(info, info.Status, true);
  31. }
  32. private void Done(IAsyncInfo info, AsyncStatus status, bool initial)
  33. {
  34. var error = default(Exception);
  35. var result = default(TResult);
  36. //
  37. // Initial interactions with the IAsyncInfo object. Those could fail, which indicates
  38. // a rogue implementation. Failure is just propagated out.
  39. //
  40. switch (status)
  41. {
  42. case AsyncStatus.Error:
  43. error = info.ErrorCode;
  44. if (error == null)
  45. throw new InvalidOperationException("The asynchronous operation failed with a null error code.");
  46. break;
  47. case AsyncStatus.Canceled:
  48. error = new OperationCanceledException();
  49. break;
  50. case AsyncStatus.Completed:
  51. if (_getResult != null)
  52. result = _getResult(info);
  53. break;
  54. default:
  55. if (!initial)
  56. throw new InvalidOperationException("The asynchronous operation completed unexpectedly.");
  57. _onCompleted(info, (iai, s) => Done(iai, s, false));
  58. return;
  59. }
  60. //
  61. // Close as early as possible, before running continuations which could fail. In case of
  62. // failure above, we don't close out the object in order to allow for debugging of the
  63. // rogue implementation without losing state prematurely. Notice _getResults is merely
  64. // an indirect call to the appropriate GetResults method, which is not supposed to throw.
  65. // Instead, an Error status should be returned.
  66. //
  67. info.Close();
  68. //
  69. // Now we run the continuations, which could take a long time. Failure here is catastrophic
  70. // and under control of the upstream subscriber.
  71. //
  72. if (error != null)
  73. {
  74. _subject.OnError(error);
  75. }
  76. else
  77. {
  78. if (_getResult != null)
  79. _subject.OnNext(result);
  80. _subject.OnCompleted();
  81. }
  82. }
  83. protected override IDisposable SubscribeCore(IObserver<TResult> observer)
  84. {
  85. return _subject.Subscribe(observer);
  86. }
  87. }
  88. }
  89. #endif