package com.chuangjiangx.member.business.stored.ddd.domain.subscribe.mq;

import com.alibaba.fastjson.JSON;
import com.chuangjiangx.member.business.common.RedisConst;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:WEB-INF/lib/member-module-5.3.26.jar:com/chuangjiangx/member/business/stored/ddd/domain/subscribe/mq/MbrEventProcessExecutor.class */
public class MbrEventProcessExecutor {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MbrEventProcessExecutor.class);
    private Object lock = new Object();
    protected static ThreadPoolExecutor workExecutor;

    @Autowired
    private MbrPayHandler mbrPayHandler;

    @Autowired
    private MbrRefundHandler mbrRefundHandler;

    @Autowired
    private RedisTemplate redisTemplate;

    public void init() {
        if (workExecutor == null) {
            synchronized (this.lock) {
                if (workExecutor == null) {
                    BasicThreadFactory.Builder namingPattern = new BasicThreadFactory.Builder().daemon(true).namingPattern("mbr-pay-event-process-%d");
                    int availableProcessors = Runtime.getRuntime().availableProcessors();
                    log.info("服务器CPU数量:{}", Integer.valueOf(availableProcessors));
                    workExecutor = new ThreadPoolExecutor(availableProcessors, 2 * availableProcessors, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2048), namingPattern.build(), new ThreadPoolExecutor.DiscardOldestPolicy());
                }
            }
        }
    }

    @Async("threadPoolExecutor")
    public void processPayEvent() {
        init();
        MbrPayHandler mbrPayHandler = this.mbrPayHandler;
        mbrPayHandler.getClass();
        eventThread(mbrPayHandler::handler, RedisConst.MBR_PAY_QUEUE, RedisConst.MBR_PAY_QUEUE_WORK);
    }

    @Async("threadPoolExecutor")
    public void processRefundEvent() {
        init();
        MbrRefundHandler mbrRefundHandler = this.mbrRefundHandler;
        mbrRefundHandler.getClass();
        eventThread(mbrRefundHandler::handler, RedisConst.MBR_REFUND_QUEUE, RedisConst.MBR_REFUND_QUEUE_WORK);
    }

    @PreDestroy
    public void destory() {
        if (workExecutor != null) {
            workExecutor.shutdown();
        }
    }

    protected void eventThread(Consumer<MbrEventParam> consumer, String str, String str2) {
        ListOperations opsForList = this.redisTemplate.opsForList();
        log.info("开始监听redis queue事件...");
        log.info("线程:{}", Thread.currentThread().getName());
        while (true) {
            try {
                String str3 = (String) opsForList.rightPopAndLeftPush(str, str2, 0L, TimeUnit.MILLISECONDS);
                log.info("读取到event:{}", str3);
                if (StringUtils.isNotBlank(str3)) {
                    workExecutor.execute(() -> {
                        MbrEventParam mbrEventParam = null;
                        try {
                            mbrEventParam = (MbrEventParam) JSON.parseObject(str3, MbrEventParam.class);
                        } catch (RuntimeException e) {
                            e.printStackTrace();
                            opsForList.remove(str2, 1L, str3);
                        }
                        if (null != mbrEventParam) {
                            consumer.accept(mbrEventParam);
                            log.info("从左移除1个元素,队列:{},返回结果:{}", str2, opsForList.remove(str2, 1L, str3));
                        }
                    });
                }
            } catch (RedisConnectionFailureException e) {
                log.warn("redis连接失败,6秒后重试...", (Throwable) e);
                try {
                    TimeUnit.SECONDS.sleep(6L);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
            }
        }
    }
}
