package com.alibaba.otter.canal.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.spi.CanalMQProducer;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/otter/canal/kafka/CanalKafkaProducer.class */
public class CanalKafkaProducer implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
    private Producer<String, Message> producer;
    private Producer<String, String> producer2;
    private MQProperties kafkaProperties;

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void init(MQProperties mQProperties) {
        this.kafkaProperties = mQProperties;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", mQProperties.getServers());
        properties.put("acks", mQProperties.getAcks());
        properties.put("compression.type", mQProperties.getCompressionType());
        properties.put("retries", Integer.valueOf(mQProperties.getRetries()));
        properties.put("batch.size", Integer.valueOf(mQProperties.getBatchSize()));
        properties.put("linger.ms", Integer.valueOf(mQProperties.getLingerMs()));
        properties.put("max.request.size", Integer.valueOf(mQProperties.getMaxRequestSize()));
        properties.put("buffer.memory", Long.valueOf(mQProperties.getBufferMemory()));
        properties.put("key.serializer", StringSerializer.class.getName());
        if (mQProperties.getFlatMessage()) {
            properties.put("value.serializer", StringSerializer.class.getName());
            this.producer2 = new KafkaProducer(properties);
        } else {
            properties.put("value.serializer", MessageSerializer.class.getName());
            this.producer = new KafkaProducer(properties);
        }
    }

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void stop() {
        try {
            try {
                logger.info("## stop the kafka producer");
                if (this.producer != null) {
                    this.producer.close();
                }
                if (this.producer2 != null) {
                    this.producer2.close();
                }
                logger.info("## kafka producer is down.");
            } catch (Throwable th) {
                logger.warn("##something goes wrong when stopping kafka producer:", th);
                logger.info("## kafka producer is down.");
            }
        } catch (Throwable th2) {
            logger.info("## kafka producer is down.");
            throw th2;
        }
    }

    @Override // com.alibaba.otter.canal.spi.CanalMQProducer
    public void send(MQProperties.CanalDestination canalDestination, Message message, CanalMQProducer.Callback callback) {
        if (this.kafkaProperties.getFlatMessage()) {
            List<FlatMessage> messageConverter = FlatMessage.messageConverter(message);
            if (messageConverter != null) {
                for (FlatMessage flatMessage : messageConverter) {
                    if (canalDestination.getPartition() != null) {
                        try {
                            this.producer2.send(new ProducerRecord(canalDestination.getTopic(), canalDestination.getPartition(), (Object) null, JSON.toJSONString(flatMessage, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}))).get();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                            callback.rollback();
                            return;
                        }
                    } else if (canalDestination.getPartitionHash() == null || canalDestination.getPartitionHash().isEmpty()) {
                        try {
                            this.producer2.send(new ProducerRecord(canalDestination.getTopic(), 0, (Object) null, JSON.toJSONString(flatMessage, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}))).get();
                        } catch (Exception e2) {
                            logger.error(e2.getMessage(), e2);
                            callback.rollback();
                            return;
                        }
                    } else {
                        FlatMessage[] messagePartition = FlatMessage.messagePartition(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                        int length = messagePartition.length;
                        for (int i = 0; i < length; i++) {
                            FlatMessage flatMessage2 = messagePartition[i];
                            if (flatMessage2 != null) {
                                try {
                                    this.producer2.send(new ProducerRecord(canalDestination.getTopic(), Integer.valueOf(i), (Object) null, JSON.toJSONString(flatMessage2, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}))).get();
                                } catch (Exception e3) {
                                    logger.error(e3.getMessage(), e3);
                                    callback.rollback();
                                    return;
                                }
                            }
                        }
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send flat message to kafka topic: [{}], packet: {}", canalDestination.getTopic(), JSON.toJSONString(flatMessage, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}));
                    }
                }
            }
        } else {
            try {
                this.producer.send(canalDestination.getPartition() != null ? new ProducerRecord(canalDestination.getTopic(), canalDestination.getPartition(), (Object) null, message) : new ProducerRecord(canalDestination.getTopic(), 0, (Object) null, message)).get();
                if (logger.isDebugEnabled()) {
                    logger.debug("Send  message to kafka topic: [{}], packet: {}", canalDestination.getTopic(), message.toString());
                }
            } catch (Exception e4) {
                logger.error(e4.getMessage(), e4);
                callback.rollback();
                return;
            }
        }
        callback.commit();
    }
}
