Timeout.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Linq.ObservableImpl
  8. {
  9. internal static class Timeout<TSource>
  10. {
  11. internal sealed class Relative : Producer<TSource, Relative._>
  12. {
  13. private readonly IObservable<TSource> _source;
  14. private readonly TimeSpan _dueTime;
  15. private readonly IObservable<TSource> _other;
  16. private readonly IScheduler _scheduler;
  17. public Relative(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)
  18. {
  19. _source = source;
  20. _dueTime = dueTime;
  21. _other = other;
  22. _scheduler = scheduler;
  23. }
  24. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  25. protected override void Run(_ sink) => sink.Run(_source);
  26. internal sealed class _ : IdentitySink<TSource>
  27. {
  28. private readonly TimeSpan _dueTime;
  29. private readonly IObservable<TSource> _other;
  30. private readonly IScheduler _scheduler;
  31. private long _index;
  32. private IDisposable _mainDisposable;
  33. private IDisposable _otherDisposable;
  34. private IDisposable _timerDisposable;
  35. public _(Relative parent, IObserver<TSource> observer)
  36. : base(observer)
  37. {
  38. _dueTime = parent._dueTime;
  39. _other = parent._other;
  40. _scheduler = parent._scheduler;
  41. }
  42. public override void Run(IObservable<TSource> source)
  43. {
  44. CreateTimer(0L);
  45. Disposable.SetSingle(ref _mainDisposable, source.SubscribeSafe(this));
  46. }
  47. protected override void Dispose(bool disposing)
  48. {
  49. if (disposing)
  50. {
  51. Disposable.TryDispose(ref _mainDisposable);
  52. Disposable.TryDispose(ref _otherDisposable);
  53. Disposable.TryDispose(ref _timerDisposable);
  54. }
  55. base.Dispose(disposing);
  56. }
  57. private void CreateTimer(long idx)
  58. {
  59. if (Disposable.TrySetMultiple(ref _timerDisposable, null))
  60. {
  61. var d = _scheduler.ScheduleAction((idx, instance: this), _dueTime, state => { state.instance.Timeout(state.idx); });
  62. Disposable.TrySetMultiple(ref _timerDisposable, d);
  63. }
  64. }
  65. private void Timeout(long idx)
  66. {
  67. if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, long.MaxValue, idx) == idx)
  68. {
  69. Disposable.TryDispose(ref _mainDisposable);
  70. var d = _other.Subscribe(GetForwarder());
  71. Disposable.SetSingle(ref _otherDisposable, d);
  72. }
  73. }
  74. public override void OnNext(TSource value)
  75. {
  76. var idx = Volatile.Read(ref _index);
  77. if (idx != long.MaxValue && Interlocked.CompareExchange(ref _index, idx + 1, idx) == idx)
  78. {
  79. // Do not swap in the BooleanDisposable.True here
  80. // As we'll need _timerDisposable to store the next timer
  81. // BD.True would cancel it immediately and break the operation
  82. Volatile.Read(ref _timerDisposable)?.Dispose();
  83. ForwardOnNext(value);
  84. CreateTimer(idx + 1);
  85. }
  86. }
  87. public override void OnError(Exception error)
  88. {
  89. if (Interlocked.Exchange(ref _index, long.MaxValue) != long.MaxValue)
  90. {
  91. Disposable.TryDispose(ref _timerDisposable);
  92. ForwardOnError(error);
  93. }
  94. }
  95. public override void OnCompleted()
  96. {
  97. if (Interlocked.Exchange(ref _index, long.MaxValue) != long.MaxValue)
  98. {
  99. Disposable.TryDispose(ref _timerDisposable);
  100. ForwardOnCompleted();
  101. }
  102. }
  103. }
  104. }
  105. internal sealed class Absolute : Producer<TSource, Absolute._>
  106. {
  107. private readonly IObservable<TSource> _source;
  108. private readonly DateTimeOffset _dueTime;
  109. private readonly IObservable<TSource> _other;
  110. private readonly IScheduler _scheduler;
  111. public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)
  112. {
  113. _source = source;
  114. _dueTime = dueTime;
  115. _other = other;
  116. _scheduler = scheduler;
  117. }
  118. protected override _ CreateSink(IObserver<TSource> observer) => new _(_other, observer);
  119. protected override void Run(_ sink) => sink.Run(this);
  120. internal sealed class _ : IdentitySink<TSource>
  121. {
  122. private readonly IObservable<TSource> _other;
  123. private IDisposable _serialDisposable;
  124. private int _wip;
  125. public _(IObservable<TSource> other, IObserver<TSource> observer)
  126. : base(observer)
  127. {
  128. _other = other;
  129. }
  130. public void Run(Absolute parent)
  131. {
  132. SetUpstream(parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Timeout()));
  133. Disposable.TrySetSingle(ref _serialDisposable, parent._source.SubscribeSafe(this));
  134. }
  135. protected override void Dispose(bool disposing)
  136. {
  137. if (disposing)
  138. {
  139. Disposable.TryDispose(ref _serialDisposable);
  140. }
  141. base.Dispose(disposing);
  142. }
  143. private void Timeout()
  144. {
  145. if (Interlocked.Increment(ref _wip) == 1)
  146. {
  147. Disposable.TrySetSerial(ref _serialDisposable, _other.SubscribeSafe(GetForwarder()));
  148. }
  149. }
  150. public override void OnNext(TSource value)
  151. {
  152. if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0)
  153. {
  154. ForwardOnNext(value);
  155. if (Interlocked.Decrement(ref _wip) != 0)
  156. {
  157. Disposable.TrySetSerial(ref _serialDisposable, _other.SubscribeSafe(GetForwarder()));
  158. }
  159. }
  160. }
  161. public override void OnError(Exception error)
  162. {
  163. if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0)
  164. {
  165. ForwardOnError(error);
  166. }
  167. }
  168. public override void OnCompleted()
  169. {
  170. if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0)
  171. {
  172. ForwardOnCompleted();
  173. }
  174. }
  175. }
  176. }
  177. }
  178. internal sealed class Timeout<TSource, TTimeout> : Producer<TSource, Timeout<TSource, TTimeout>._>
  179. {
  180. private readonly IObservable<TSource> _source;
  181. private readonly IObservable<TTimeout> _firstTimeout;
  182. private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector;
  183. private readonly IObservable<TSource> _other;
  184. public Timeout(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutSelector, IObservable<TSource> other)
  185. {
  186. _source = source;
  187. _firstTimeout = firstTimeout;
  188. _timeoutSelector = timeoutSelector;
  189. _other = other;
  190. }
  191. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  192. protected override void Run(_ sink) => sink.Run(this);
  193. internal sealed class _ : IdentitySink<TSource>
  194. {
  195. private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector;
  196. private readonly IObservable<TSource> _other;
  197. private IDisposable _sourceDisposable;
  198. private IDisposable _timerDisposable;
  199. private long _index;
  200. public _(Timeout<TSource, TTimeout> parent, IObserver<TSource> observer)
  201. : base(observer)
  202. {
  203. _timeoutSelector = parent._timeoutSelector;
  204. _other = parent._other;
  205. }
  206. public void Run(Timeout<TSource, TTimeout> parent)
  207. {
  208. SetTimer(parent._firstTimeout, 0L);
  209. Disposable.TrySetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
  210. }
  211. protected override void Dispose(bool disposing)
  212. {
  213. if (disposing)
  214. {
  215. Disposable.TryDispose(ref _sourceDisposable);
  216. Disposable.TryDispose(ref _timerDisposable);
  217. }
  218. base.Dispose(disposing);
  219. }
  220. public override void OnNext(TSource value)
  221. {
  222. var idx = Volatile.Read(ref _index);
  223. if (idx != long.MaxValue)
  224. {
  225. if (Interlocked.CompareExchange(ref _index, idx + 1, idx) == idx)
  226. {
  227. // Do not use Disposable.TryDispose here, we need the field
  228. // for the next timer
  229. Volatile.Read(ref _timerDisposable)?.Dispose();
  230. ForwardOnNext(value);
  231. var timeoutSource = default(IObservable<TTimeout>);
  232. try
  233. {
  234. timeoutSource = _timeoutSelector(value);
  235. }
  236. catch (Exception ex)
  237. {
  238. ForwardOnError(ex);
  239. return;
  240. }
  241. SetTimer(timeoutSource, idx + 1);
  242. }
  243. }
  244. }
  245. public override void OnError(Exception error)
  246. {
  247. if (Interlocked.Exchange(ref _index, long.MaxValue) != long.MaxValue)
  248. {
  249. ForwardOnError(error);
  250. }
  251. }
  252. public override void OnCompleted()
  253. {
  254. if (Interlocked.Exchange(ref _index, long.MaxValue) != long.MaxValue)
  255. {
  256. ForwardOnCompleted();
  257. }
  258. }
  259. private void Timeout(long idx)
  260. {
  261. if (Volatile.Read(ref _index) == idx
  262. && Interlocked.CompareExchange(ref _index, long.MaxValue, idx) == idx)
  263. {
  264. Disposable.TrySetSerial(ref _sourceDisposable, _other.SubscribeSafe(GetForwarder()));
  265. }
  266. }
  267. private bool TimeoutError(long idx, Exception error)
  268. {
  269. if (Volatile.Read(ref _index) == idx
  270. && Interlocked.CompareExchange(ref _index, long.MaxValue, idx) == idx)
  271. {
  272. ForwardOnError(error);
  273. return true;
  274. }
  275. return false;
  276. }
  277. private void SetTimer(IObservable<TTimeout> timeout, long idx)
  278. {
  279. var timeoutObserver = new TimeoutObserver(this, idx);
  280. if (Disposable.TrySetSerial(ref _timerDisposable, timeoutObserver))
  281. {
  282. var d = timeout.Subscribe(timeoutObserver);
  283. timeoutObserver.SetResource(d);
  284. }
  285. }
  286. private sealed class TimeoutObserver : SafeObserver<TTimeout>
  287. {
  288. private readonly _ _parent;
  289. private readonly long _id;
  290. public TimeoutObserver(_ parent, long id)
  291. {
  292. _parent = parent;
  293. _id = id;
  294. }
  295. public override void OnNext(TTimeout value)
  296. {
  297. OnCompleted();
  298. }
  299. public override void OnError(Exception error)
  300. {
  301. if (!_parent.TimeoutError(_id, error))
  302. {
  303. Dispose();
  304. }
  305. }
  306. public override void OnCompleted()
  307. {
  308. _parent.Timeout(_id);
  309. Dispose();
  310. }
  311. }
  312. }
  313. }
  314. }