AsyncInfoToObservableBridge.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if HAS_WINRT
  5. using System.Reactive.Subjects;
  6. using Windows.Foundation;
  7. namespace System.Reactive.Windows.Foundation
  8. {
  9. internal class AsyncInfoToObservableBridge<TResult, TProgress> : ObservableBase<TResult>
  10. {
  11. private readonly Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> _onCompleted;
  12. private readonly Func<IAsyncInfo, TResult> _getResult;
  13. private readonly AsyncSubject<TResult> _subject;
  14. public AsyncInfoToObservableBridge(IAsyncInfo info, Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> onCompleted, Func<IAsyncInfo, TResult> getResult, Action<IAsyncInfo, Action<IAsyncInfo, TProgress>> onProgress, IProgress<TProgress> progress, bool multiValue)
  15. {
  16. _onCompleted = onCompleted;
  17. _getResult = getResult;
  18. _subject = new AsyncSubject<TResult>();
  19. onProgress?.Invoke(info, (iai, p) =>
  20. {
  21. if (multiValue && getResult != null)
  22. {
  23. _subject.OnNext(getResult(iai));
  24. }
  25. progress?.Report(p);
  26. });
  27. Done(info, info.Status, true);
  28. }
  29. private void Done(IAsyncInfo info, AsyncStatus status, bool initial)
  30. {
  31. var error = default(Exception);
  32. var result = default(TResult);
  33. //
  34. // Initial interactions with the IAsyncInfo object. Those could fail, which indicates
  35. // a rogue implementation. Failure is just propagated out.
  36. //
  37. switch (status)
  38. {
  39. case AsyncStatus.Error:
  40. error = info.ErrorCode;
  41. if (error == null)
  42. {
  43. throw new InvalidOperationException("The asynchronous operation failed with a null error code.");
  44. }
  45. break;
  46. case AsyncStatus.Canceled:
  47. error = new OperationCanceledException();
  48. break;
  49. case AsyncStatus.Completed:
  50. if (_getResult != null)
  51. {
  52. result = _getResult(info);
  53. }
  54. break;
  55. default:
  56. if (!initial)
  57. {
  58. throw new InvalidOperationException("The asynchronous operation completed unexpectedly.");
  59. }
  60. _onCompleted(info, (iai, s) => Done(iai, s, false));
  61. return;
  62. }
  63. //
  64. // Close as early as possible, before running continuations which could fail. In case of
  65. // failure above, we don't close out the object in order to allow for debugging of the
  66. // rogue implementation without losing state prematurely. Notice _getResults is merely
  67. // an indirect call to the appropriate GetResults method, which is not supposed to throw.
  68. // Instead, an Error status should be returned.
  69. //
  70. info.Close();
  71. //
  72. // Now we run the continuations, which could take a long time. Failure here is catastrophic
  73. // and under control of the upstream subscriber.
  74. //
  75. if (error != null)
  76. {
  77. _subject.OnError(error);
  78. }
  79. else
  80. {
  81. if (_getResult != null)
  82. {
  83. _subject.OnNext(result);
  84. }
  85. _subject.OnCompleted();
  86. }
  87. }
  88. protected override IDisposable SubscribeCore(IObserver<TResult> observer)
  89. {
  90. return _subject.Subscribe(observer);
  91. }
  92. }
  93. }
  94. #endif