/*
 * Decompiled with CFR 0.152.
 */
package org.reactfx;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import javafx.beans.value.ObservableBooleanValue;
import javafx.concurrent.Task;
import javafx.concurrent.WorkerStateEvent;
import org.reactfx.AwaitingEventStream;
import org.reactfx.EventStream;
import org.reactfx.EventStreamBase;
import org.reactfx.Guard;
import org.reactfx.Subscription;
import org.reactfx.SuspendableNo;
import org.reactfx.util.TriConsumer;
import org.reactfx.util.Try;

class Await<T, F>
extends EventStreamBase<Try<T>>
implements AwaitingEventStream<Try<T>> {
    private final EventStream<F> source;
    private final SuspendableNo pending = new SuspendableNo();
    private final BiConsumer<F, TriConsumer<T, Throwable, Boolean>> addCompletionHandler;

    public static <T> AwaitingEventStream<Try<T>> awaitCompletionStage(EventStream<CompletionStage<T>> source, Executor clientThreadExecutor) {
        return new Await<T, CompletionStage>(source, (future, handler) -> Await.addCompletionHandler(future, handler, clientThreadExecutor));
    }

    public static <T> AwaitingEventStream<Try<T>> awaitTask(EventStream<Task<T>> source) {
        return new Await<T, Task>(source, Await::addCompletionHandler);
    }

    static <T> void addCompletionHandler(CompletionStage<T> future, TriConsumer<T, Throwable, Boolean> handler, Executor executor) {
        future.whenCompleteAsync((result, error) -> handler.accept((Object)result, (Throwable)error, false), executor);
    }

    static <T> void addCompletionHandler(Task<T> t2, TriConsumer<T, Throwable, Boolean> handler) {
        t2.addEventHandler(WorkerStateEvent.WORKER_STATE_SUCCEEDED, e2 -> handler.accept(t2.getValue(), null, false));
        t2.addEventHandler(WorkerStateEvent.WORKER_STATE_FAILED, e2 -> handler.accept(null, t2.getException(), false));
        t2.addEventHandler(WorkerStateEvent.WORKER_STATE_CANCELLED, e2 -> handler.accept(null, null, true));
    }

    private Await(EventStream<F> source, BiConsumer<F, TriConsumer<T, Throwable, Boolean>> addCompletionHandler) {
        this.source = source;
        this.addCompletionHandler = addCompletionHandler;
    }

    @Override
    public final ObservableBooleanValue pendingProperty() {
        return this.pending;
    }

    @Override
    public final boolean isPending() {
        return this.pending.get();
    }

    @Override
    protected final Subscription observeInputs() {
        return this.source.subscribe(future -> {
            Guard g2 = this.pending.suspend();
            this.addCompletionHandler.accept(future, (result, error, cancelled) -> {
                if (!cancelled.booleanValue()) {
                    this.emit(error == null ? Try.success(result) : Try.failure(error));
                }
                g2.close();
            });
        });
    }
}

