package org.elasticsearch.xpack.watcher.trigger.schedule.engine;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

/* loaded from: input_file:org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.class */
public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
    public static final Setting<TimeValue> TICKER_INTERVAL_SETTING;
    private final TimeValue tickInterval;
    private volatile Map<String, ActiveSchedule> schedules;
    private Ticker ticker;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine$ActiveSchedule.class */
    public static class ActiveSchedule {
        private final String name;
        private final Schedule schedule;
        private final long startTime;
        private volatile long scheduledTime;

        ActiveSchedule(String str, Schedule schedule, long j) {
            this.name = str;
            this.schedule = schedule;
            this.startTime = j;
            this.scheduledTime = schedule.nextScheduledTimeAfter(j, j);
        }

        public long check(long j) {
            if (j < this.scheduledTime) {
                return -1L;
            }
            long j2 = this.scheduledTime == 0 ? j : this.scheduledTime;
            this.scheduledTime = this.schedule.nextScheduledTimeAfter(this.startTime, j);
            return j2;
        }
    }

    /* loaded from: input_file:org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine$Ticker.class */
    class Ticker extends Thread {
        private volatile boolean active;
        private final CountDownLatch closeLatch;

        Ticker() {
            super("ticker-schedule-trigger-engine");
            this.active = true;
            this.closeLatch = new CountDownLatch(1);
            setDaemon(true);
            start();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.active) {
                TickerScheduleTriggerEngine.this.logger.trace("checking jobs [{}]", new DateTime(TickerScheduleTriggerEngine.this.clock.millis(), DateTimeZone.UTC));
                TickerScheduleTriggerEngine.this.checkJobs();
                try {
                    sleep(TickerScheduleTriggerEngine.this.tickInterval.millis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            this.closeLatch.countDown();
        }

        public void close() {
            TickerScheduleTriggerEngine.this.logger.trace("stopping ticker thread");
            this.active = false;
            try {
                this.closeLatch.await();
            } catch (InterruptedException e) {
                TickerScheduleTriggerEngine.this.logger.warn("caught an interrupted exception when waiting while closing ticker thread", e);
                Thread.currentThread().interrupt();
            }
            TickerScheduleTriggerEngine.this.logger.trace("ticker thread stopped");
        }
    }

    public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
        super(settings, scheduleRegistry, clock);
        this.tickInterval = (TimeValue) TICKER_INTERVAL_SETTING.get(settings);
        this.schedules = new ConcurrentHashMap();
    }

    @Override // org.elasticsearch.xpack.watcher.trigger.TriggerEngine
    public void start(Collection<Watch> collection) {
        long millis = this.clock.millis();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (Watch watch : collection) {
            if (watch.trigger() instanceof ScheduleTrigger) {
                concurrentHashMap.put(watch.id(), new ActiveSchedule(watch.id(), ((ScheduleTrigger) watch.trigger()).getSchedule(), millis));
            }
        }
        this.schedules = concurrentHashMap;
        this.ticker = new Ticker();
    }

    @Override // org.elasticsearch.xpack.watcher.trigger.TriggerEngine
    public void stop() {
        this.ticker.close();
        pauseExecution();
    }

    @Override // org.elasticsearch.xpack.watcher.trigger.TriggerEngine
    public void add(Watch watch) {
        if (!$assertionsDisabled && !(watch.trigger() instanceof ScheduleTrigger)) {
            throw new AssertionError();
        }
        this.schedules.put(watch.id(), new ActiveSchedule(watch.id(), ((ScheduleTrigger) watch.trigger()).getSchedule(), this.clock.millis()));
    }

    @Override // org.elasticsearch.xpack.watcher.trigger.TriggerEngine
    public void pauseExecution() {
        this.schedules.clear();
    }

    @Override // org.elasticsearch.xpack.watcher.trigger.TriggerEngine
    public int getJobCount() {
        return this.schedules.size();
    }

    @Override // org.elasticsearch.xpack.watcher.trigger.TriggerEngine
    public boolean remove(String str) {
        return this.schedules.remove(str) != null;
    }

    void checkJobs() {
        long millis = this.clock.millis();
        ArrayList arrayList = new ArrayList();
        for (ActiveSchedule activeSchedule : this.schedules.values()) {
            long check = activeSchedule.check(millis);
            if (check > 0) {
                this.logger.debug("triggered job [{}] at [{}] (scheduled time was [{}])", activeSchedule.name, new DateTime(millis, DateTimeZone.UTC), new DateTime(check, DateTimeZone.UTC));
                arrayList.add(new ScheduleTriggerEvent(activeSchedule.name, new DateTime(millis, DateTimeZone.UTC), new DateTime(check, DateTimeZone.UTC)));
                if (arrayList.size() >= 1000) {
                    notifyListeners(arrayList);
                    arrayList.clear();
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        notifyListeners(arrayList);
    }

    protected void notifyListeners(List<TriggerEvent> list) {
        this.consumers.forEach(consumer -> {
            consumer.accept(list);
        });
    }

    static {
        $assertionsDisabled = !TickerScheduleTriggerEngine.class.desiredAssertionStatus();
        TICKER_INTERVAL_SETTING = Setting.positiveTimeSetting("xpack.watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500L), new Setting.Property[]{Setting.Property.NodeScope});
    }
}
