// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Reactive.Disposables; using System.Text; using System.Threading; using System.Linq; namespace System.Reactive.Linq.ObservableImpl { internal sealed class AmbManyArray : BasicProducer { readonly IObservable[] sources; public AmbManyArray(IObservable[] sources) { this.sources = sources; } protected override IDisposable Run(IObserver observer) { return AmbCoordinator.Create(observer, sources); } } internal sealed class AmbManyEnumerable : BasicProducer { readonly IEnumerable> sources; public AmbManyEnumerable(IEnumerable> sources) { this.sources = sources; } protected override IDisposable Run(IObserver observer) { var sourcesEnumerable = this.sources; var sources = default(IObservable[]); try { sources = sourcesEnumerable.ToArray(); } catch (Exception ex) { observer.OnError(ex); return Disposable.Empty; } return AmbCoordinator.Create(observer, sources); } } internal sealed class AmbCoordinator : IDisposable { readonly IObserver downstream; readonly InnerObserver[] observers; int winner; internal AmbCoordinator(IObserver downstream, int n) { this.downstream = downstream; var o = new InnerObserver[n]; for (int i = 0; i < n; i++) { o[i] = new InnerObserver(this, i); } observers = o; Volatile.Write(ref winner, -1); } internal static IDisposable Create(IObserver observer, IObservable[] sources) { var n = sources.Length; if (n == 0) { observer.OnCompleted(); return Disposable.Empty; } if (n == 1) { return sources[0].Subscribe(observer); } var parent = new AmbCoordinator(observer, n); parent.Subscribe(sources); return parent; } internal void Subscribe(IObservable[] sources) { for (var i = 0; i < observers.Length; i++) { var inner = Volatile.Read(ref observers[i]); if (inner == null) { break; } inner.OnSubscribe(sources[i].Subscribe(inner)); } } public void Dispose() { for (var i = 0; i < observers.Length; i++) { Interlocked.Exchange(ref observers[i], null)?.Dispose(); } } bool TryWin(int index) { if (Volatile.Read(ref winner) == -1 && Interlocked.CompareExchange(ref winner, index, -1) == -1) { for (var i = 0; i < observers.Length; i++) { if (index != i) { Interlocked.Exchange(ref observers[i], null)?.Dispose(); } } return true; } return false; } internal sealed class InnerObserver : IObserver, IDisposable { readonly IObserver downstream; readonly AmbCoordinator parent; readonly int index; IDisposable upstream; bool won; public InnerObserver(AmbCoordinator parent, int index) { this.downstream = parent.downstream; this.parent = parent; this.index = index; } public void Dispose() { Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose(); } public void OnCompleted() { if (won) { downstream.OnCompleted(); } else if (parent.TryWin(index)) { won = true; downstream.OnCompleted(); } Dispose(); } public void OnError(Exception error) { if (won) { downstream.OnError(error); } else if (parent.TryWin(index)) { won = true; downstream.OnError(error); } Dispose(); } public void OnNext(T value) { if (won) { downstream.OnNext(value); } else if (parent.TryWin(index)) { won = true; downstream.OnNext(value); } else { Dispose(); } } internal void OnSubscribe(IDisposable d) { if (Interlocked.CompareExchange(ref upstream, d, null) != null) { d?.Dispose(); } } } } }