package com.alibaba.otter.canal.rocketmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.CanalMessageSerializer;
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.exception.CanalServerException;
import com.alibaba.otter.canal.spi.CanalMQProducer;
import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.class */
public class CanalRocketMQProducer implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
    private DefaultMQProducer defaultMQProducer;
    private MQProperties mqProperties;

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void init(MQProperties mQProperties) {
        this.mqProperties = mQProperties;
        ClientRPCHook clientRPCHook = null;
        if (mQProperties.getAliyunAccessKey().length() > 0 && mQProperties.getAliyunSecretKey().length() > 0) {
            SessionCredentials sessionCredentials = new SessionCredentials();
            sessionCredentials.setAccessKey(mQProperties.getAliyunAccessKey());
            sessionCredentials.setSecretKey(mQProperties.getAliyunSecretKey());
            clientRPCHook = new ClientRPCHook(sessionCredentials);
        }
        this.defaultMQProducer = new DefaultMQProducer(mQProperties.getProducerGroup(), clientRPCHook);
        this.defaultMQProducer.setNamesrvAddr(mQProperties.getServers());
        this.defaultMQProducer.setRetryTimesWhenSendFailed(mQProperties.getRetries());
        this.defaultMQProducer.setVipChannelEnabled(false);
        logger.info("##Start RocketMQ producer##");
        try {
            this.defaultMQProducer.start();
        } catch (MQClientException e) {
            throw new CanalServerException("Start RocketMQ producer error", (Throwable) e);
        }
    }

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void send(final MQProperties.CanalDestination canalDestination, Message message, CanalMQProducer.Callback callback) {
        if (this.mqProperties.getFlatMessage()) {
            List<FlatMessage> messageConverter = FlatMessage.messageConverter(message);
            if (messageConverter != null) {
                for (FlatMessage flatMessage : messageConverter) {
                    if (canalDestination.getPartition() != null) {
                        try {
                            logger.info("send flat message: {} to topic: {} fixed partition: {}", new Object[]{JSON.toJSONString(flatMessage, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}), canalDestination.getTopic(), canalDestination.getPartition()});
                            this.defaultMQProducer.send(new org.apache.rocketmq.common.message.Message(canalDestination.getTopic(), JSON.toJSONString(flatMessage, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}).getBytes()), new MessageQueueSelector() { // from class: com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer.2
                                public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message2, Object obj) {
                                    return list.get(canalDestination.getPartition().intValue());
                                }
                            }, (Object) null);
                        } catch (Exception e) {
                            logger.error("send flat message to fixed partition error", e);
                            callback.rollback();
                            return;
                        }
                    } else if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                        FlatMessage[] messagePartition = FlatMessage.messagePartition(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                        int length = messagePartition.length;
                        for (int i = 0; i < length; i++) {
                            FlatMessage flatMessage2 = messagePartition[i];
                            if (flatMessage2 != null) {
                                logger.debug("flatMessagePart: {}, partition: {}", JSON.toJSONString(flatMessage2, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}), Integer.valueOf(i));
                                final int i2 = i;
                                try {
                                    this.defaultMQProducer.send(new org.apache.rocketmq.common.message.Message(canalDestination.getTopic(), JSON.toJSONString(flatMessage2, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}).getBytes()), new MessageQueueSelector() { // from class: com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer.3
                                        public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message2, Object obj) {
                                            if (i2 > list.size()) {
                                                throw new CanalServerException("partition number is error,config num:" + canalDestination.getPartitionsNum() + ", mq num: " + list.size());
                                            }
                                            return list.get(i2);
                                        }
                                    }, (Object) null);
                                } catch (Exception e2) {
                                    logger.error("send flat message to hashed partition error", e2);
                                    callback.rollback();
                                    return;
                                }
                            }
                        }
                    }
                }
            }
        } else {
            try {
                org.apache.rocketmq.common.message.Message message2 = new org.apache.rocketmq.common.message.Message(canalDestination.getTopic(), CanalMessageSerializer.serializer(message, this.mqProperties.isFilterTransactionEntry()));
                logger.debug("send message:{} to destination:{}, partition: {}", new Object[]{message2, canalDestination.getCanalDestination(), canalDestination.getPartition()});
                this.defaultMQProducer.send(message2, new MessageQueueSelector() { // from class: com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer.1
                    public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message3, Object obj) {
                        int i3 = 0;
                        if (canalDestination.getPartition() != null) {
                            i3 = canalDestination.getPartition().intValue();
                        }
                        return list.get(i3);
                    }
                }, (Object) null);
            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e3) {
                logger.error("Send message error!", e3);
                callback.rollback();
                return;
            }
        }
        callback.commit();
        if (logger.isDebugEnabled()) {
            logger.debug("send message to rocket topic: {}", canalDestination.getTopic());
        }
    }

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void stop() {
        logger.info("## Stop RocketMQ producer##");
        this.defaultMQProducer.shutdown();
    }
}
