// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class AmbManyArray : BasicProducer { private readonly IObservable[] _sources; public AmbManyArray(IObservable[] sources) { _sources = sources; } protected override IDisposable Run(IObserver observer) { return AmbCoordinator.Create(observer, _sources); } } internal sealed class AmbManyEnumerable : BasicProducer { private readonly IEnumerable> _sources; public AmbManyEnumerable(IEnumerable> sources) { _sources = sources; } protected override IDisposable Run(IObserver observer) { var sourcesEnumerable = _sources; IObservable[] sources; try { sources = sourcesEnumerable.ToArray(); } catch (Exception ex) { observer.OnError(ex); return Disposable.Empty; } return AmbCoordinator.Create(observer, sources); } } internal sealed class AmbCoordinator : IDisposable { private readonly IObserver _downstream; private readonly InnerObserver?[] _observers; private int _winner; internal AmbCoordinator(IObserver downstream, int n) { _downstream = downstream; var o = new InnerObserver?[n]; for (var i = 0; i < n; i++) { o[i] = new InnerObserver(this, i); } _observers = o; Volatile.Write(ref _winner, -1); } internal static IDisposable Create(IObserver observer, IObservable[] sources) { var n = sources.Length; if (n == 0) { observer.OnCompleted(); return Disposable.Empty; } if (n == 1) { return sources[0].Subscribe(observer); } var parent = new AmbCoordinator(observer, n); parent.Subscribe(sources); return parent; } internal void Subscribe(IObservable[] sources) { for (var i = 0; i < _observers.Length; i++) { var inner = Volatile.Read(ref _observers[i]); if (inner == null) { break; } inner.Run(sources[i]); } } public void Dispose() { for (var i = 0; i < _observers.Length; i++) { Interlocked.Exchange(ref _observers[i], null)?.Dispose(); } } private bool TryWin(int index) { if (Volatile.Read(ref _winner) == -1 && Interlocked.CompareExchange(ref _winner, index, -1) == -1) { for (var i = 0; i < _observers.Length; i++) { if (index != i) { Interlocked.Exchange(ref _observers[i], null)?.Dispose(); } } return true; } return false; } internal sealed class InnerObserver : IdentitySink { private readonly AmbCoordinator _parent; private readonly int _index; private bool _won; public InnerObserver(AmbCoordinator parent, int index) : base(parent._downstream) { _parent = parent; _index = index; } public override void OnCompleted() { if (_won) { ForwardOnCompleted(); } else if (_parent.TryWin(_index)) { _won = true; ForwardOnCompleted(); } else { Dispose(); } } public override void OnError(Exception error) { if (_won) { ForwardOnError(error); } else if (_parent.TryWin(_index)) { _won = true; ForwardOnError(error); } else { Dispose(); } } public override void OnNext(T value) { if (_won) { ForwardOnNext(value); } else if (_parent.TryWin(_index)) { _won = true; ForwardOnNext(value); } else { Dispose(); } } } } }