package com.chuangjiangx.event.rocketmq;

import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;

/* loaded from: input_file:WEB-INF/lib/chuangjiangx-event-module-1.0-SNAPSHOT.jar:com/chuangjiangx/event/rocketmq/AbstractRocketPushConsumer.class */
public abstract class AbstractRocketPushConsumer extends AbstractRocketConsumer implements RocketPushConsumer {
    public AbstractRocketPushConsumer(RocketConfig rocketConfig) {
        super(rocketConfig);
    }

    protected abstract MQPushConsumer getConsumer();

    @Override // com.chuangjiangx.event.rocketmq.RocketPushConsumer
    public void listening() {
        MQPushConsumer consumer = getConsumer();
        RocketConfig config = getConfig();
        MessageListener listener = getListener();
        try {
            consumer.subscribe(config.getTopic(), config.getTags());
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        if (listener instanceof MessageListenerOrderly) {
            consumer.registerMessageListener((MessageListenerOrderly) listener);
        } else {
            if (!(listener instanceof MessageListenerConcurrently)) {
                throw new IllegalStateException("rocket mq consumer 注册了无法识别的监听器");
            }
            consumer.registerMessageListener((MessageListenerConcurrently) listener);
        }
        try {
            consumer.start();
        } catch (MQClientException e2) {
            e2.printStackTrace();
        }
    }

    @Override // com.chuangjiangx.event.rocketmq.AbstractRocketConsumer
    protected void doRegister() {
    }
}
