/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.impl.QueryResponseHandler;
import io.confluent.ksql.api.client.impl.RowImpl;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamQueryResponseHandler
extends QueryResponseHandler<CompletableFuture<StreamedQueryResult>> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamQueryResponseHandler.class);
    private StreamedQueryResultImpl queryResult;
    private Map<String, Integer> columnNameToIndex;
    private boolean paused;
    private AtomicReference<String> serializedConsistencyVector;

    StreamQueryResponseHandler(Context context, RecordParser recordParser, CompletableFuture<StreamedQueryResult> cf, AtomicReference<String> serializedCV) {
        super(context, recordParser, cf);
        this.serializedConsistencyVector = Objects.requireNonNull(serializedCV, "serializedCV");
    }

    @Override
    protected void handleMetadata(QueryResponseMetadata queryResponseMetadata) {
        this.queryResult = new StreamedQueryResultImpl(this.context, queryResponseMetadata.queryId, queryResponseMetadata.columnNames, RowUtil.columnTypesFromStrings(queryResponseMetadata.columnTypes));
        this.columnNameToIndex = RowUtil.valueToIndexMap(queryResponseMetadata.columnNames);
        this.cf.complete(this.queryResult);
    }

    @Override
    protected void handleRow(Buffer buff) {
        if (this.queryResult == null) {
            throw new IllegalStateException("handleRow called before metadata processed");
        }
        Object json = buff.toJson();
        if (json instanceof JsonArray) {
            RowImpl row = new RowImpl(this.queryResult.columnNames(), this.queryResult.columnTypes(), (JsonArray)json, this.columnNameToIndex);
            boolean full = this.queryResult.accept(row);
            if (full && !this.paused) {
                this.recordParser.pause();
                this.queryResult.drainHandler(this::publisherReceptive);
                this.paused = true;
            }
        } else if (json instanceof JsonObject) {
            JsonObject jsonObject = (JsonObject)json;
            if (jsonObject.getMap() != null && jsonObject.getMap().containsKey("consistencyToken")) {
                LOG.info("Response contains consistency vector " + jsonObject);
                this.serializedConsistencyVector.set((String)((JsonObject)json).getMap().get("consistencyToken"));
            } else {
                this.queryResult.handleError(new KsqlException(jsonObject.getString("message")));
            }
        } else {
            throw new RuntimeException("Could not decode JSON: " + json);
        }
    }

    @Override
    protected void doHandleBodyEnd() {
        this.queryResult.complete();
    }

    @Override
    public void handleExceptionAfterFutureCompleted(Throwable t) {
        this.queryResult.handleError(new Exception(t));
    }

    private void publisherReceptive() {
        this.checkContext();
        this.paused = false;
        this.recordParser.resume();
    }
}

