package com.alibaba.otter.canal.parse.inbound.mysql;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer;
import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
import com.alibaba.otter.canal.parse.inbound.TableMeta;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogContext;
import com.taobao.tddl.dbsync.binlog.LogDecoder;
import com.taobao.tddl.dbsync.binlog.LogEvent;
import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

/* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.class */
public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
    private static final int maxFullTimes = 10;
    private LogEventConvert logEventConvert;
    private EventTransactionBuffer transactionBuffer;
    private ErosaConnection connection;
    private int parserThreadCount;
    private int ringBufferSize;
    private RingBuffer<MessageEvent> disruptorMsgBuffer;
    private ExecutorService parserExecutor;
    private ExecutorService stageExecutor;
    private String destination;
    private volatile CanalParseException exception;
    private AtomicLong eventsPublishBlockingTime;
    private GTIDSet gtidSet;
    private WorkerPool<MessageEvent> workerPool;
    private BatchEventProcessor<MessageEvent> simpleParserStage;
    private BatchEventProcessor<MessageEvent> sinkStoreStage;
    private LogContext logContext;

    /* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor$DmlParserStage.class */
    private class DmlParserStage implements WorkHandler<MessageEvent>, LifecycleAware {
        private DmlParserStage() {
        }

        public void onEvent(MessageEvent messageEvent) throws Exception {
            CanalEntry.Entry parseRowsEvent;
            try {
                if (messageEvent.isNeedDmlParse()) {
                    switch (messageEvent.getEvent().getHeader().getType()) {
                        case 29:
                            parseRowsEvent = MysqlMultiStageCoprocessor.this.logEventConvert.parse(messageEvent.getEvent(), false);
                            break;
                        default:
                            parseRowsEvent = MysqlMultiStageCoprocessor.this.logEventConvert.parseRowsEvent((RowsLogEvent) messageEvent.getEvent(), messageEvent.getTable());
                            break;
                    }
                    messageEvent.setEntry(parseRowsEvent);
                }
            } catch (Throwable th) {
                MysqlMultiStageCoprocessor.this.exception = new CanalParseException(th);
                throw MysqlMultiStageCoprocessor.this.exception;
            }
        }

        public void onStart() {
        }

        public void onShutdown() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor$MessageEvent.class */
    public class MessageEvent {
        private LogBuffer buffer;
        private CanalEntry.Entry entry;
        private boolean needDmlParse = false;
        private TableMeta table;
        private LogEvent event;

        MessageEvent() {
        }

        public LogBuffer getBuffer() {
            return this.buffer;
        }

        public void setBuffer(LogBuffer logBuffer) {
            this.buffer = logBuffer;
        }

        public LogEvent getEvent() {
            return this.event;
        }

        public void setEvent(LogEvent logEvent) {
            this.event = logEvent;
        }

        public CanalEntry.Entry getEntry() {
            return this.entry;
        }

        public void setEntry(CanalEntry.Entry entry) {
            this.entry = entry;
        }

        public boolean isNeedDmlParse() {
            return this.needDmlParse;
        }

        public void setNeedDmlParse(boolean z) {
            this.needDmlParse = z;
        }

        public TableMeta getTable() {
            return this.table;
        }

        public void setTable(TableMeta tableMeta) {
            this.table = tableMeta;
        }
    }

    /* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor$MessageEventFactory.class */
    class MessageEventFactory implements EventFactory<MessageEvent> {
        MessageEventFactory() {
        }

        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public MessageEvent m6newInstance() {
            return new MessageEvent();
        }
    }

    /* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor$SimpleFatalExceptionHandler.class */
    class SimpleFatalExceptionHandler implements ExceptionHandler {
        SimpleFatalExceptionHandler() {
        }

        public void handleEventException(Throwable th, long j, Object obj) {
        }

        public void handleOnStartException(Throwable th) {
        }

        public void handleOnShutdownException(Throwable th) {
        }
    }

    /* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor$SimpleParserStage.class */
    private class SimpleParserStage implements EventHandler<MessageEvent>, LifecycleAware {
        private LogDecoder decoder = new LogDecoder(0, 165);
        private LogContext context;

        public SimpleParserStage(LogContext logContext) {
            this.context = logContext;
            if (MysqlMultiStageCoprocessor.this.gtidSet != null) {
                logContext.setGtidSet(MysqlMultiStageCoprocessor.this.gtidSet);
            }
        }

        public void onEvent(MessageEvent messageEvent, long j, boolean z) throws Exception {
            try {
                LogEvent event = messageEvent.getEvent();
                if (event == null) {
                    event = this.decoder.decode(messageEvent.getBuffer(), this.context);
                    messageEvent.setEvent(event);
                }
                TableMeta tableMeta = null;
                boolean z2 = false;
                switch (event.getHeader().getType()) {
                    case 23:
                    case 30:
                        tableMeta = MysqlMultiStageCoprocessor.this.logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) event);
                        z2 = true;
                        break;
                    case 24:
                    case 31:
                    case 39:
                        tableMeta = MysqlMultiStageCoprocessor.this.logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) event);
                        z2 = true;
                        break;
                    case 25:
                    case 32:
                        tableMeta = MysqlMultiStageCoprocessor.this.logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) event);
                        z2 = true;
                        break;
                    case 26:
                    case 27:
                    case 28:
                    case 33:
                    case 34:
                    case 35:
                    case 36:
                    case 37:
                    case 38:
                    default:
                        messageEvent.setEntry(MysqlMultiStageCoprocessor.this.logEventConvert.parse(messageEvent.getEvent(), false));
                        break;
                    case 29:
                        z2 = true;
                        break;
                }
                messageEvent.setNeedDmlParse(z2);
                messageEvent.setTable(tableMeta);
            } catch (Throwable th) {
                MysqlMultiStageCoprocessor.this.exception = new CanalParseException(th);
                throw MysqlMultiStageCoprocessor.this.exception;
            }
        }

        public void onStart() {
        }

        public void onShutdown() {
        }
    }

    /* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor$SinkStoreStage.class */
    private class SinkStoreStage implements EventHandler<MessageEvent>, LifecycleAware {
        private SinkStoreStage() {
        }

        public void onEvent(MessageEvent messageEvent, long j, boolean z) throws Exception {
            try {
                if (messageEvent.getEntry() != null) {
                    MysqlMultiStageCoprocessor.this.transactionBuffer.add(messageEvent.getEntry());
                }
                LogEvent event = messageEvent.getEvent();
                if ((MysqlMultiStageCoprocessor.this.connection instanceof MysqlConnection) && event.getSemival() == 1) {
                    ((MysqlConnection) MysqlMultiStageCoprocessor.this.connection).sendSemiAck(event.getHeader().getLogFileName(), Long.valueOf(event.getHeader().getLogPos()));
                }
                messageEvent.setBuffer(null);
                messageEvent.setEvent(null);
                messageEvent.setTable(null);
                messageEvent.setEntry(null);
                messageEvent.setNeedDmlParse(false);
            } catch (Throwable th) {
                MysqlMultiStageCoprocessor.this.exception = new CanalParseException(th);
                throw MysqlMultiStageCoprocessor.this.exception;
            }
        }

        public void onStart() {
        }

        public void onShutdown() {
        }
    }

    public MysqlMultiStageCoprocessor(int i, int i2, LogEventConvert logEventConvert, EventTransactionBuffer eventTransactionBuffer, String str) {
        this.ringBufferSize = i;
        this.parserThreadCount = i2;
        this.logEventConvert = logEventConvert;
        this.transactionBuffer = eventTransactionBuffer;
        this.destination = str;
    }

    public void start() {
        super.start();
        this.exception = null;
        this.disruptorMsgBuffer = RingBuffer.createSingleProducer(new MessageEventFactory(), this.ringBufferSize, new BlockingWaitStrategy());
        this.parserExecutor = Executors.newFixedThreadPool(this.parserThreadCount > 0 ? this.parserThreadCount : 1, new NamedThreadFactory("MultiStageCoprocessor-Parser-" + this.destination));
        this.stageExecutor = Executors.newFixedThreadPool(2, new NamedThreadFactory("MultiStageCoprocessor-other-" + this.destination));
        SequenceBarrier newBarrier = this.disruptorMsgBuffer.newBarrier(new Sequence[0]);
        SimpleFatalExceptionHandler simpleFatalExceptionHandler = new SimpleFatalExceptionHandler();
        this.logContext = new LogContext();
        this.simpleParserStage = new BatchEventProcessor<>(this.disruptorMsgBuffer, newBarrier, new SimpleParserStage(this.logContext));
        this.simpleParserStage.setExceptionHandler(simpleFatalExceptionHandler);
        this.disruptorMsgBuffer.addGatingSequences(new Sequence[]{this.simpleParserStage.getSequence()});
        SequenceBarrier newBarrier2 = this.disruptorMsgBuffer.newBarrier(new Sequence[]{this.simpleParserStage.getSequence()});
        DmlParserStage[] dmlParserStageArr = new DmlParserStage[this.parserThreadCount];
        for (int i = 0; i < this.parserThreadCount; i++) {
            dmlParserStageArr[i] = new DmlParserStage();
        }
        this.workerPool = new WorkerPool<>(this.disruptorMsgBuffer, newBarrier2, simpleFatalExceptionHandler, dmlParserStageArr);
        Sequence[] workerSequences = this.workerPool.getWorkerSequences();
        this.disruptorMsgBuffer.addGatingSequences(workerSequences);
        this.sinkStoreStage = new BatchEventProcessor<>(this.disruptorMsgBuffer, this.disruptorMsgBuffer.newBarrier(workerSequences), new SinkStoreStage());
        this.sinkStoreStage.setExceptionHandler(simpleFatalExceptionHandler);
        this.disruptorMsgBuffer.addGatingSequences(new Sequence[]{this.sinkStoreStage.getSequence()});
        this.stageExecutor.submit((Runnable) this.simpleParserStage);
        this.stageExecutor.submit((Runnable) this.sinkStoreStage);
        this.workerPool.start(this.parserExecutor);
    }

    public void setBinlogChecksum(int i) {
        if (i != 0) {
            this.logContext.setFormatDescription(new FormatDescriptionLogEvent(4, i));
        }
    }

    public void stop() {
        this.workerPool.halt();
        this.simpleParserStage.halt();
        this.sinkStoreStage.halt();
        try {
            this.parserExecutor.shutdownNow();
            while (!this.parserExecutor.awaitTermination(1L, TimeUnit.SECONDS) && !this.parserExecutor.isShutdown() && !this.parserExecutor.isTerminated()) {
                this.parserExecutor.shutdownNow();
            }
        } catch (Throwable th) {
        }
        try {
            this.stageExecutor.shutdownNow();
            while (!this.stageExecutor.awaitTermination(1L, TimeUnit.SECONDS) && !this.stageExecutor.isShutdown() && !this.stageExecutor.isTerminated()) {
                this.stageExecutor.shutdownNow();
            }
        } catch (Throwable th2) {
        }
        super.stop();
    }

    @Override // com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor
    public boolean publish(LogBuffer logBuffer) {
        return publish(logBuffer, null);
    }

    @Override // com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor
    public boolean publish(LogEvent logEvent) {
        return publish(null, logEvent);
    }

    private boolean publish(LogBuffer logBuffer, LogEvent logEvent) {
        if (!isStart()) {
            if (this.exception != null) {
                throw this.exception;
            }
            return false;
        }
        if (this.exception != null) {
            throw this.exception;
        }
        long j = 0;
        int i = 0;
        do {
            try {
                long tryNext = this.disruptorMsgBuffer.tryNext();
                MessageEvent messageEvent = (MessageEvent) this.disruptorMsgBuffer.get(tryNext);
                if (logBuffer != null) {
                    messageEvent.setBuffer(logBuffer);
                } else {
                    messageEvent.setEvent(logEvent);
                }
                this.disruptorMsgBuffer.publish(tryNext);
                if (i > 0) {
                    this.eventsPublishBlockingTime.addAndGet(System.nanoTime() - j);
                }
                break;
            } catch (InsufficientCapacityException e) {
                if (i == 0) {
                    j = System.nanoTime();
                }
                i++;
                applyWait(i);
                boolean interrupted = Thread.interrupted();
                if (i % 1000 == 0) {
                    long nanoTime = System.nanoTime();
                    this.eventsPublishBlockingTime.addAndGet(nanoTime - j);
                    j = nanoTime;
                }
                if (interrupted) {
                    break;
                }
            }
        } while (isStart());
        return isStart();
    }

    private void applyWait(int i) {
        int i2 = i > maxFullTimes ? maxFullTimes : i;
        if (i <= 3) {
            Thread.yield();
        } else {
            LockSupport.parkNanos(100000 * i2);
        }
    }

    public void setLogEventConvert(LogEventConvert logEventConvert) {
        this.logEventConvert = logEventConvert;
    }

    public void setTransactionBuffer(EventTransactionBuffer eventTransactionBuffer) {
        this.transactionBuffer = eventTransactionBuffer;
    }

    public void setConnection(ErosaConnection erosaConnection) {
        this.connection = erosaConnection;
    }

    public void setEventsPublishBlockingTime(AtomicLong atomicLong) {
        this.eventsPublishBlockingTime = atomicLong;
    }

    public void setGtidSet(GTIDSet gTIDSet) {
        this.gtidSet = gTIDSet;
    }
}
