/*
 * Decompiled with CFR 0.152.
 */
package org.framework.wu.framework.queue.listener;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.framework.wu.framework.queue.Message;
import org.framework.wu.framework.queue.MessageQueue;
import org.framework.wu.framework.queue.MessageQueueFactory;
import org.framework.wu.framework.queue.listener.QueueListener;

public class QueueListenerRunner
implements Runnable {
    private final List<QueueListener> queueListeners;
    ThreadPoolExecutor QUEUE_EXECUTOR = new ThreadPoolExecutor(10, 20, 3L, TimeUnit.MINUTES, new LinkedBlockingDeque<Runnable>(50), new ThreadPoolExecutor.AbortPolicy());

    public QueueListenerRunner(List<QueueListener> queueListeners) {
        this.queueListeners = queueListeners;
    }

    @Override
    public void run() {
        for (QueueListener queueListener : this.queueListeners) {
            Thread thread = new Thread(() -> {
                String queueName = queueListener.queueName();
                String topic = queueListener.topic();
                MessageQueue receiverQueue = MessageQueueFactory.getQueue(queueName);
                while (true) {
                    Message receive = receiverQueue.receive();
                    if (!(Objects.isNull(topic) | receive.getTopic().equals(topic))) continue;
                    try {
                        queueListener.listener(receive);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            this.QUEUE_EXECUTOR.submit(thread::start);
        }
    }
}

