package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;

/* loaded from: input_file:org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.class */
public class AutodetectCommunicator implements Closeable {
    private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class);
    private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1);
    private final Job job;
    private final OpenJobAction.JobTask jobTask;
    private final DataCountsReporter dataCountsReporter;
    private final AutodetectProcess autodetectProcess;
    private final AutoDetectResultProcessor autoDetectResultProcessor;
    private final Consumer<Exception> handler;
    private final ExecutorService autodetectWorkerExecutor;
    private final NamedXContentRegistry xContentRegistry;
    private volatile boolean processKilled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AutodetectCommunicator(Job job, OpenJobAction.JobTask jobTask, AutodetectProcess autodetectProcess, DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> consumer, NamedXContentRegistry namedXContentRegistry, ExecutorService executorService) {
        this.job = job;
        this.jobTask = jobTask;
        this.autodetectProcess = autodetectProcess;
        this.dataCountsReporter = dataCountsReporter;
        this.autoDetectResultProcessor = autoDetectResultProcessor;
        this.handler = consumer;
        this.xContentRegistry = namedXContentRegistry;
        this.autodetectWorkerExecutor = executorService;
    }

    public void writeJobInputHeader() throws IOException {
        createProcessWriter(Optional.empty()).writeHeader();
    }

    private DataToProcessWriter createProcessWriter(Optional<DataDescription> optional) {
        return DataToProcessWriterFactory.create(true, this.autodetectProcess, optional.orElse(this.job.getDataDescription()), this.job.getAnalysisConfig(), this.dataCountsReporter, this.xContentRegistry);
    }

    public void writeToJob(InputStream inputStream, XContentType xContentType, DataLoadParams dataLoadParams, BiConsumer<DataCounts, Exception> biConsumer) {
        submitOperation(() -> {
            if (dataLoadParams.isResettingBuckets()) {
                this.autodetectProcess.writeResetBucketsControlMessage(dataLoadParams);
            }
            CountingInputStream countingInputStream = new CountingInputStream(inputStream, this.dataCountsReporter);
            DataToProcessWriter createProcessWriter = createProcessWriter(dataLoadParams.getDataDescription());
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicReference atomicReference = new AtomicReference();
            AtomicReference atomicReference2 = new AtomicReference();
            createProcessWriter.write(countingInputStream, xContentType, (dataCounts, exc) -> {
                atomicReference.set(dataCounts);
                atomicReference2.set(exc);
                countDownLatch.countDown();
            });
            countDownLatch.await();
            createProcessWriter.flushStream();
            if (atomicReference2.get() != null) {
                throw ((Exception) atomicReference2.get());
            }
            return (DataCounts) atomicReference.get();
        }, biConsumer);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        close(false, null);
    }

    public void close(boolean z, String str) throws IOException {
        try {
            this.autodetectWorkerExecutor.submit(() -> {
                checkProcessIsAlive();
                try {
                    this.autodetectProcess.close();
                    this.autoDetectResultProcessor.awaitCompletion();
                    this.handler.accept(z ? new ElasticsearchException(str, new Object[0]) : null);
                    LOGGER.info("[{}] job closed", this.job.getId());
                    return null;
                } catch (Throwable th) {
                    this.handler.accept(z ? new ElasticsearchException(str, new Object[0]) : null);
                    throw th;
                }
            }).get();
            this.autodetectWorkerExecutor.shutdown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            throw ExceptionsHelper.convertToElastic(e2);
        }
    }

    public void killProcess() throws IOException {
        this.processKilled = true;
        this.autoDetectResultProcessor.setProcessKilled();
        this.autodetectProcess.kill();
    }

    public void writeUpdateProcessMessage(ModelPlotConfig modelPlotConfig, List<JobUpdate.DetectorUpdate> list, BiConsumer<Void, Exception> biConsumer) {
        submitOperation(() -> {
            if (modelPlotConfig != null) {
                this.autodetectProcess.writeUpdateModelPlotMessage(modelPlotConfig);
            }
            if (list == null) {
                return null;
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                JobUpdate.DetectorUpdate detectorUpdate = (JobUpdate.DetectorUpdate) it.next();
                if (detectorUpdate.getRules() != null) {
                    this.autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getIndex(), detectorUpdate.getRules());
                }
            }
            return null;
        }, biConsumer);
    }

    public void flushJob(InterimResultsParams interimResultsParams, BiConsumer<Void, Exception> biConsumer) {
        submitOperation(() -> {
            waitFlushToCompletion(this.autodetectProcess.flushJob(interimResultsParams));
            return null;
        }, biConsumer);
    }

    void waitFlushToCompletion(String str) {
        LOGGER.debug("[{}] waiting for flush", this.job.getId());
        try {
            boolean waitForFlushAcknowledgement = this.autoDetectResultProcessor.waitForFlushAcknowledgement(str, FLUSH_PROCESS_CHECK_FREQUENCY);
            while (!waitForFlushAcknowledgement) {
                checkProcessIsAlive();
                checkResultsProcessorIsAlive();
                waitForFlushAcknowledgement = this.autoDetectResultProcessor.waitForFlushAcknowledgement(str, FLUSH_PROCESS_CHECK_FREQUENCY);
            }
            if (this.processKilled) {
                return;
            }
            this.autoDetectResultProcessor.waitUntilRenormalizerIsIdle();
            LOGGER.debug("[{}] Flush completed", this.job.getId());
        } finally {
            this.autoDetectResultProcessor.clearAwaitingFlush(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkProcessIsAlive() {
        if (this.autodetectProcess.isProcessAlive()) {
            return;
        }
        ParameterizedMessage parameterizedMessage = new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", this.job.getId(), this.autodetectProcess.readError());
        LOGGER.error(parameterizedMessage);
        throw new ElasticsearchException(parameterizedMessage.getFormattedMessage(), new Object[0]);
    }

    private void checkResultsProcessorIsAlive() {
        if (this.autoDetectResultProcessor.isFailed()) {
            ParameterizedMessage parameterizedMessage = new ParameterizedMessage("[{}] Unexpected death of the result processor", this.job.getId());
            LOGGER.error(parameterizedMessage);
            throw new ElasticsearchException(parameterizedMessage.getFormattedMessage(), new Object[0]);
        }
    }

    public OpenJobAction.JobTask getJobTask() {
        return this.jobTask;
    }

    public ZonedDateTime getProcessStartTime() {
        return this.autodetectProcess.getProcessStartTime();
    }

    public ModelSizeStats getModelSizeStats() {
        return this.autoDetectResultProcessor.modelSizeStats();
    }

    public DataCounts getDataCounts() {
        return this.dataCountsReporter.runningTotalStats();
    }

    private <T> void submitOperation(final CheckedSupplier<T, Exception> checkedSupplier, final BiConsumer<T, Exception> biConsumer) {
        this.autodetectWorkerExecutor.execute(new AbstractRunnable() { // from class: org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectCommunicator.1
            public void onFailure(Exception exc) {
                if (AutodetectCommunicator.this.processKilled) {
                    biConsumer.accept(null, null);
                } else {
                    AutodetectCommunicator.LOGGER.error(new ParameterizedMessage("[{}] Unexpected exception writing to process", AutodetectCommunicator.this.job.getId()), exc);
                    biConsumer.accept(null, exc);
                }
            }

            protected void doRun() throws Exception {
                if (AutodetectCommunicator.this.processKilled) {
                    biConsumer.accept(null, null);
                } else {
                    AutodetectCommunicator.this.checkProcessIsAlive();
                    biConsumer.accept(checkedSupplier.get(), null);
                }
            }
        });
    }
}
