ReplaySubject.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. using System.Threading;
  5. namespace System.Reactive.Subjects
  6. {
  7. /// <summary>
  8. /// Represents an object that is both an observable sequence as well as an observer.
  9. /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
  10. /// </summary>
  11. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  12. public sealed class ReplaySubject<T> : ISubject<T>, IDisposable
  13. {
  14. private readonly IReplaySubjectImplementation _implementation;
  15. /// <summary>
  16. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
  17. /// </summary>
  18. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  19. /// <param name="window">Maximum time length of the replay buffer.</param>
  20. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  21. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  22. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  23. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
  24. {
  25. _implementation = new ReplayByTime(bufferSize, window, scheduler);
  26. }
  27. /// <summary>
  28. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and window.
  29. /// </summary>
  30. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  31. /// <param name="window">Maximum time length of the replay buffer.</param>
  32. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
  33. public ReplaySubject(int bufferSize, TimeSpan window)
  34. {
  35. _implementation = new ReplayByTime(bufferSize, window);
  36. }
  37. /// <summary>
  38. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
  39. /// </summary>
  40. public ReplaySubject()
  41. {
  42. _implementation = new ReplayAll();
  43. }
  44. /// <summary>
  45. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified scheduler.
  46. /// </summary>
  47. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  48. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  49. public ReplaySubject(IScheduler scheduler)
  50. {
  51. _implementation = new ReplayByTime(scheduler);
  52. }
  53. //TODO: Does this overload make any sense with the optimisations? Surely this now is just <c>new ReplaySubject<T>(bufferSize).SubscribeOn(scheduler)</c>?
  54. //Potentially should be marked as obsolete
  55. /// <summary>
  56. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
  57. /// </summary>
  58. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  59. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  60. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  61. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  62. public ReplaySubject(int bufferSize, IScheduler scheduler)
  63. {
  64. _implementation = new ReplayByTime(bufferSize, scheduler);
  65. }
  66. /// <summary>
  67. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size.
  68. /// </summary>
  69. /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
  70. /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
  71. public ReplaySubject(int bufferSize)
  72. {
  73. switch (bufferSize)
  74. {
  75. case 1:
  76. _implementation = new ReplayOne();
  77. break;
  78. case int.MaxValue:
  79. _implementation = new ReplayAll();
  80. break;
  81. default:
  82. _implementation = new ReplayMany(bufferSize);
  83. break;
  84. }
  85. }
  86. /// <summary>
  87. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window and scheduler.
  88. /// </summary>
  89. /// <param name="window">Maximum time length of the replay buffer.</param>
  90. /// <param name="scheduler">Scheduler the observers are invoked on.</param>
  91. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  92. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  93. public ReplaySubject(TimeSpan window, IScheduler scheduler)
  94. {
  95. _implementation = new ReplayByTime(window, scheduler);
  96. }
  97. /// <summary>
  98. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window.
  99. /// </summary>
  100. /// <param name="window">Maximum time length of the replay buffer.</param>
  101. /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
  102. public ReplaySubject(TimeSpan window)
  103. {
  104. _implementation = new ReplayByTime(window);
  105. }
  106. /// <summary>
  107. /// Indicates whether the subject has observers subscribed to it.
  108. /// </summary>
  109. public bool HasObservers
  110. {
  111. get { return _implementation.HasObservers; }
  112. }
  113. /// <summary>
  114. /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
  115. /// </summary>
  116. /// <param name="value">The value to send to all observers.</param>
  117. public void OnNext(T value)
  118. {
  119. _implementation.OnNext(value);
  120. }
  121. /// <summary>
  122. /// Notifies all subscribed and future observers about the specified exception.
  123. /// </summary>
  124. /// <param name="error">The exception to send to all observers.</param>
  125. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  126. public void OnError(Exception error)
  127. {
  128. _implementation.OnError(error);
  129. }
  130. /// <summary>
  131. /// Notifies all subscribed and future observers about the end of the sequence.
  132. /// </summary>
  133. public void OnCompleted()
  134. {
  135. _implementation.OnCompleted();
  136. }
  137. /// <summary>
  138. /// Subscribes an observer to the subject.
  139. /// </summary>
  140. /// <param name="observer">Observer to subscribe to the subject.</param>
  141. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  142. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  143. public IDisposable Subscribe(IObserver<T> observer)
  144. {
  145. return _implementation.Subscribe(observer);
  146. }
  147. /// <summary>
  148. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
  149. /// </summary>
  150. public void Dispose()
  151. {
  152. _implementation.Dispose();
  153. }
  154. private interface IReplaySubjectImplementation : ISubject<T>, IDisposable
  155. {
  156. bool HasObservers { get; }
  157. void Unsubscribe(IObserver<T> observer);
  158. }
  159. private class Subscription : IDisposable
  160. {
  161. private IReplaySubjectImplementation _subject;
  162. private IObserver<T> _observer;
  163. public Subscription(IReplaySubjectImplementation subject, IObserver<T> observer)
  164. {
  165. _subject = subject;
  166. _observer = observer;
  167. }
  168. public void Dispose()
  169. {
  170. var observer = Interlocked.Exchange(ref _observer, null);
  171. if (observer == null)
  172. return;
  173. _subject.Unsubscribe(observer);
  174. _subject = null;
  175. }
  176. }
  177. /// <summary>
  178. /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
  179. /// </summary>
  180. private sealed class ReplayByTime : IReplaySubjectImplementation
  181. {
  182. private const int InfiniteBufferSize = int.MaxValue;
  183. private readonly int _bufferSize;
  184. private readonly TimeSpan _window;
  185. private readonly IScheduler _scheduler;
  186. private readonly IStopwatch _stopwatch;
  187. private readonly Queue<TimeInterval<T>> _queue;
  188. private bool _isStopped;
  189. private Exception _error;
  190. private ImmutableList<ScheduledObserver<T>> _observers;
  191. private bool _isDisposed;
  192. private readonly object _gate = new object();
  193. public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
  194. {
  195. if (bufferSize < 0)
  196. throw new ArgumentOutOfRangeException("bufferSize");
  197. if (window < TimeSpan.Zero)
  198. throw new ArgumentOutOfRangeException("window");
  199. if (scheduler == null)
  200. throw new ArgumentNullException("scheduler");
  201. _bufferSize = bufferSize;
  202. _window = window;
  203. _scheduler = scheduler;
  204. _stopwatch = _scheduler.StartStopwatch();
  205. _queue = new Queue<TimeInterval<T>>();
  206. _isStopped = false;
  207. _error = null;
  208. _observers = ImmutableList<ScheduledObserver<T>>.Empty;
  209. }
  210. public ReplayByTime(int bufferSize, TimeSpan window)
  211. : this(bufferSize, window, SchedulerDefaults.Iteration)
  212. {
  213. }
  214. public ReplayByTime(IScheduler scheduler)
  215. : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
  216. {
  217. }
  218. public ReplayByTime(int bufferSize, IScheduler scheduler)
  219. : this(bufferSize, TimeSpan.MaxValue, scheduler)
  220. {
  221. }
  222. public ReplayByTime(TimeSpan window, IScheduler scheduler)
  223. : this(InfiniteBufferSize, window, scheduler)
  224. {
  225. }
  226. public ReplayByTime(TimeSpan window)
  227. : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
  228. {
  229. }
  230. public bool HasObservers
  231. {
  232. get
  233. {
  234. var observers = _observers;
  235. return observers != null && observers.Data.Length > 0;
  236. }
  237. }
  238. private void Trim(TimeSpan now)
  239. {
  240. while (_queue.Count > _bufferSize)
  241. _queue.Dequeue();
  242. while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
  243. _queue.Dequeue();
  244. }
  245. public void OnNext(T value)
  246. {
  247. var o = default(ScheduledObserver<T>[]);
  248. lock (_gate)
  249. {
  250. CheckDisposed();
  251. if (!_isStopped)
  252. {
  253. var now = _stopwatch.Elapsed;
  254. _queue.Enqueue(new TimeInterval<T>(value, now));
  255. Trim(now);
  256. o = _observers.Data;
  257. foreach (var observer in o)
  258. observer.OnNext(value);
  259. }
  260. }
  261. if (o != null)
  262. foreach (var observer in o)
  263. observer.EnsureActive();
  264. }
  265. public void OnError(Exception error)
  266. {
  267. if (error == null)
  268. throw new ArgumentNullException("error");
  269. var o = default(ScheduledObserver<T>[]);
  270. lock (_gate)
  271. {
  272. CheckDisposed();
  273. if (!_isStopped)
  274. {
  275. var now = _stopwatch.Elapsed;
  276. _isStopped = true;
  277. _error = error;
  278. Trim(now);
  279. o = _observers.Data;
  280. foreach (var observer in o)
  281. observer.OnError(error);
  282. _observers = ImmutableList<ScheduledObserver<T>>.Empty;
  283. }
  284. }
  285. if (o != null)
  286. foreach (var observer in o)
  287. observer.EnsureActive();
  288. }
  289. public void OnCompleted()
  290. {
  291. var o = default(ScheduledObserver<T>[]);
  292. lock (_gate)
  293. {
  294. CheckDisposed();
  295. if (!_isStopped)
  296. {
  297. var now = _stopwatch.Elapsed;
  298. _isStopped = true;
  299. Trim(now);
  300. o = _observers.Data;
  301. foreach (var observer in o)
  302. observer.OnCompleted();
  303. _observers = ImmutableList<ScheduledObserver<T>>.Empty;
  304. }
  305. }
  306. if (o != null)
  307. foreach (var observer in o)
  308. observer.EnsureActive();
  309. }
  310. public IDisposable Subscribe(IObserver<T> observer)
  311. {
  312. if (observer == null)
  313. throw new ArgumentNullException("observer");
  314. var so = new ScheduledObserver<T>(_scheduler, observer);
  315. var n = 0;
  316. var subscription = new RemovableDisposable(this, so);
  317. lock (_gate)
  318. {
  319. CheckDisposed();
  320. //
  321. // Notice the v1.x behavior of always calling Trim is preserved here.
  322. //
  323. // This may be subject (pun intended) of debate: should this policy
  324. // only be applied while the sequence is active? With the current
  325. // behavior, a sequence will "die out" after it has terminated by
  326. // continuing to drop OnNext notifications from the queue.
  327. //
  328. // In v1.x, this behavior was due to trimming based on the clock value
  329. // returned by scheduler.Now, applied to all but the terminal message
  330. // in the queue. Using the IStopwatch has the same effect. Either way,
  331. // we guarantee the final notification will be observed, but there's
  332. // no way to retain the buffer directly. One approach is to use the
  333. // time-based TakeLast operator and apply an unbounded ReplaySubject
  334. // to it.
  335. //
  336. // To conclude, we're keeping the behavior as-is for compatibility
  337. // reasons with v1.x.
  338. //
  339. Trim(_stopwatch.Elapsed);
  340. _observers = _observers.Add(so);
  341. n = _queue.Count;
  342. foreach (var item in _queue)
  343. so.OnNext(item.Value);
  344. if (_error != null)
  345. {
  346. n++;
  347. so.OnError(_error);
  348. }
  349. else if (_isStopped)
  350. {
  351. n++;
  352. so.OnCompleted();
  353. }
  354. }
  355. so.EnsureActive(n);
  356. return subscription;
  357. }
  358. private void Unsubscribe(ScheduledObserver<T> observer)
  359. {
  360. lock (_gate)
  361. {
  362. observer.Dispose();
  363. if (!_isDisposed)
  364. {
  365. _observers = _observers.Remove(observer);
  366. }
  367. }
  368. }
  369. void IReplaySubjectImplementation.Unsubscribe(IObserver<T> observer)
  370. {
  371. var so = (ScheduledObserver<T>)observer;
  372. Unsubscribe(so);
  373. }
  374. sealed class RemovableDisposable : IDisposable
  375. {
  376. private readonly ReplayByTime _subject;
  377. private readonly ScheduledObserver<T> _observer;
  378. public RemovableDisposable(ReplayByTime subject, ScheduledObserver<T> observer)
  379. {
  380. _subject = subject;
  381. _observer = observer;
  382. }
  383. public void Dispose()
  384. {
  385. _observer.Dispose();
  386. _subject.Unsubscribe(_observer);
  387. }
  388. }
  389. private void CheckDisposed()
  390. {
  391. if (_isDisposed)
  392. throw new ObjectDisposedException(string.Empty);
  393. }
  394. public void Dispose()
  395. {
  396. lock (_gate)
  397. {
  398. _isDisposed = true;
  399. _observers = null;
  400. _queue.Clear();
  401. }
  402. }
  403. }
  404. //
  405. // Below are the non-time based implementations.
  406. // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
  407. // The ReplayOne implementation also removes the need to even have a queue.
  408. //
  409. private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
  410. {
  411. private bool _hasValue;
  412. private T _value;
  413. protected override void Trim()
  414. {
  415. //
  416. // No need to trim.
  417. //
  418. }
  419. protected override void AddValueToBuffer(T value)
  420. {
  421. _hasValue = true;
  422. _value = value;
  423. }
  424. protected override void ReplayBuffer(IObserver<T> observer)
  425. {
  426. if (_hasValue)
  427. observer.OnNext(_value);
  428. }
  429. protected override void Dispose(bool disposing)
  430. {
  431. base.Dispose(disposing);
  432. _value = default(T);
  433. }
  434. }
  435. private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
  436. {
  437. private readonly int _bufferSize;
  438. public ReplayMany(int bufferSize)
  439. : base(bufferSize)
  440. {
  441. _bufferSize = bufferSize;
  442. }
  443. protected override void Trim()
  444. {
  445. while (Queue.Count > _bufferSize)
  446. Queue.Dequeue();
  447. }
  448. }
  449. private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
  450. {
  451. public ReplayAll()
  452. : base(0)
  453. {
  454. }
  455. protected override void Trim()
  456. {
  457. //
  458. // Don't trim, keep all values.
  459. //
  460. }
  461. }
  462. private abstract class ReplayBufferBase : IReplaySubjectImplementation
  463. {
  464. private readonly object _gate = new object();
  465. private bool _isDisposed;
  466. private bool _isStopped;
  467. private Exception _error;
  468. private ImmutableList<IObserver<T>> _observers;
  469. protected ReplayBufferBase()
  470. {
  471. _observers = ImmutableList<IObserver<T>>.Empty;
  472. }
  473. protected abstract void Trim();
  474. protected abstract void AddValueToBuffer(T value);
  475. protected abstract void ReplayBuffer(IObserver<T> observer);
  476. public bool HasObservers
  477. {
  478. get
  479. {
  480. var observers = _observers;
  481. return observers != null && observers.Data.Length > 0;
  482. }
  483. }
  484. public void OnNext(T value)
  485. {
  486. lock (_gate)
  487. {
  488. CheckDisposed();
  489. if (!_isStopped)
  490. {
  491. AddValueToBuffer(value);
  492. Trim();
  493. var o = _observers.Data;
  494. foreach (var observer in o)
  495. observer.OnNext(value);
  496. }
  497. }
  498. }
  499. public void OnError(Exception error)
  500. {
  501. if (error == null)
  502. throw new ArgumentNullException("error");
  503. lock (_gate)
  504. {
  505. CheckDisposed();
  506. if (!_isStopped)
  507. {
  508. _isStopped = true;
  509. _error = error;
  510. Trim();
  511. var o = _observers.Data;
  512. foreach (var observer in o)
  513. observer.OnError(error);
  514. _observers = ImmutableList<IObserver<T>>.Empty;
  515. }
  516. }
  517. }
  518. public void OnCompleted()
  519. {
  520. lock (_gate)
  521. {
  522. CheckDisposed();
  523. if (!_isStopped)
  524. {
  525. _isStopped = true;
  526. Trim();
  527. var o = _observers.Data;
  528. foreach (var observer in o)
  529. observer.OnCompleted();
  530. _observers = ImmutableList<IObserver<T>>.Empty;
  531. }
  532. }
  533. }
  534. public IDisposable Subscribe(IObserver<T> observer)
  535. {
  536. if (observer == null)
  537. throw new ArgumentNullException("observer");
  538. var subscription = new Subscription(this, observer);
  539. lock (_gate)
  540. {
  541. CheckDisposed();
  542. //
  543. // Notice the v1.x behavior of always calling Trim is preserved here.
  544. //
  545. // This may be subject (pun intended) of debate: should this policy
  546. // only be applied while the sequence is active? With the current
  547. // behavior, a sequence will "die out" after it has terminated by
  548. // continuing to drop OnNext notifications from the queue.
  549. //
  550. // In v1.x, this behavior was due to trimming based on the clock value
  551. // returned by scheduler.Now, applied to all but the terminal message
  552. // in the queue. Using the IStopwatch has the same effect. Either way,
  553. // we guarantee the final notification will be observed, but there's
  554. // no way to retain the buffer directly. One approach is to use the
  555. // time-based TakeLast operator and apply an unbounded ReplaySubject
  556. // to it.
  557. //
  558. // To conclude, we're keeping the behavior as-is for compatibility
  559. // reasons with v1.x.
  560. //
  561. _observers = _observers.Add(observer);
  562. ReplayBuffer(observer);
  563. if (_error != null)
  564. {
  565. observer.OnError(_error);
  566. }
  567. else if (_isStopped)
  568. {
  569. observer.OnCompleted();
  570. }
  571. }
  572. return subscription;
  573. }
  574. public void Unsubscribe(IObserver<T> observer)
  575. {
  576. lock (_gate)
  577. {
  578. if (!_isDisposed)
  579. {
  580. _observers = _observers.Remove(observer);
  581. }
  582. }
  583. }
  584. private void CheckDisposed()
  585. {
  586. if (_isDisposed)
  587. throw new ObjectDisposedException(string.Empty);
  588. }
  589. public void Dispose()
  590. {
  591. Dispose(true);
  592. }
  593. protected virtual void Dispose(bool disposing)
  594. {
  595. lock (_gate)
  596. {
  597. _isDisposed = true;
  598. _observers = null;
  599. }
  600. }
  601. }
  602. private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
  603. {
  604. private readonly Queue<T> _queue;
  605. protected ReplayManyBase(int queueSize)
  606. : base()
  607. {
  608. _queue = new Queue<T>(queueSize);
  609. }
  610. protected Queue<T> Queue
  611. {
  612. get
  613. {
  614. return _queue;
  615. }
  616. }
  617. protected override void AddValueToBuffer(T value)
  618. {
  619. _queue.Enqueue(value);
  620. }
  621. protected override void ReplayBuffer(IObserver<T> observer)
  622. {
  623. foreach (var item in _queue)
  624. observer.OnNext(item);
  625. }
  626. protected override void Dispose(bool disposing)
  627. {
  628. base.Dispose(disposing);
  629. _queue.Clear();
  630. }
  631. }
  632. }
  633. }