package com.alibaba.otter.canal.sink.entry;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.LogIdentity;
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
import com.alibaba.otter.canal.sink.CanalEventDownStreamHandler;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.canal.sink.exception.CanalSinkException;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
import com.alibaba.otter.canal.store.model.Event;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/canal/sink/entry/EntryEventSink.class */
public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {
    private static final Logger logger = LoggerFactory.getLogger(EntryEventSink.class);
    private static final int maxFullTimes = 10;
    private CanalEventStore<Event> eventStore;
    protected boolean filterTransactionEntry = false;
    protected boolean filterEmtryTransactionEntry = true;
    protected long emptyTransactionInterval = 5000;
    protected long emptyTransctionThresold = 8192;
    protected volatile long lastTransactionTimestamp = 0;
    protected AtomicLong lastTransactionCount = new AtomicLong(0);
    protected volatile long lastEmptyTransactionTimestamp = 0;
    protected AtomicLong lastEmptyTransactionCount = new AtomicLong(0);
    protected AtomicLong eventsSinkBlockingTime = new AtomicLong(0);
    protected boolean raw;

    public EntryEventSink() {
        addHandler(new HeartBeatEntryEventHandler());
    }

    public void start() {
        super.start();
        Assert.notNull(this.eventStore);
        if (this.eventStore instanceof MemoryEventStoreWithBuffer) {
            this.raw = this.eventStore.isRaw();
        }
        for (CanalEventDownStreamHandler canalEventDownStreamHandler : getHandlers()) {
            if (!canalEventDownStreamHandler.isStart()) {
                canalEventDownStreamHandler.start();
            }
        }
    }

    public void stop() {
        super.stop();
        for (CanalEventDownStreamHandler canalEventDownStreamHandler : getHandlers()) {
            if (canalEventDownStreamHandler.isStart()) {
                canalEventDownStreamHandler.stop();
            }
        }
    }

    public boolean filter(List<CanalEntry.Entry> list, InetSocketAddress inetSocketAddress, String str) {
        return false;
    }

    @Override // com.alibaba.otter.canal.sink.CanalEventSink
    public boolean sink(List<CanalEntry.Entry> list, InetSocketAddress inetSocketAddress, String str) throws CanalSinkException, InterruptedException {
        return sinkData(list, inetSocketAddress);
    }

    private boolean sinkData(List<CanalEntry.Entry> list, InetSocketAddress inetSocketAddress) throws InterruptedException {
        boolean z = false;
        boolean z2 = false;
        ArrayList arrayList = new ArrayList();
        for (CanalEntry.Entry entry : list) {
            if (doFilter(entry)) {
                if (this.filterTransactionEntry && (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)) {
                    long executeTime = entry.getHeader().getExecuteTime();
                    if (Math.abs(executeTime - this.lastTransactionTimestamp) > this.emptyTransactionInterval || this.lastTransactionCount.incrementAndGet() > this.emptyTransctionThresold) {
                        this.lastTransactionCount.set(0L);
                        this.lastTransactionTimestamp = executeTime;
                    }
                }
                z |= entry.getEntryType() == CanalEntry.EntryType.ROWDATA;
                z2 |= entry.getEntryType() == CanalEntry.EntryType.HEARTBEAT;
                arrayList.add(new Event(new LogIdentity(inetSocketAddress, -1L), entry, this.raw));
            }
        }
        if (z || z2) {
            return doSink(arrayList);
        }
        if (!this.filterEmtryTransactionEntry || CollectionUtils.isEmpty(arrayList)) {
            return true;
        }
        long executeTime2 = arrayList.get(0).getExecuteTime();
        if (Math.abs(executeTime2 - this.lastEmptyTransactionTimestamp) <= this.emptyTransactionInterval && this.lastEmptyTransactionCount.incrementAndGet() <= this.emptyTransctionThresold) {
            return true;
        }
        this.lastEmptyTransactionCount.set(0L);
        this.lastEmptyTransactionTimestamp = executeTime2;
        return doSink(arrayList);
    }

    protected boolean doFilter(CanalEntry.Entry entry) {
        if (this.filter == null || entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
            return true;
        }
        String schemaNameAndTableName = getSchemaNameAndTableName(entry);
        boolean filter = this.filter.filter(schemaNameAndTableName);
        if (!filter) {
            logger.debug("filter name[{}] entry : {}:{}", new Object[]{schemaNameAndTableName, entry.getHeader().getLogfileName(), Long.valueOf(entry.getHeader().getLogfileOffset())});
        }
        return filter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean doSink(List<Event> list) {
        Iterator<CanalEventDownStreamHandler> it = getHandlers().iterator();
        while (it.hasNext()) {
            list = (List) it.next().before(list);
        }
        long j = 0;
        int i = 0;
        while (!this.eventStore.tryPut(list)) {
            if (i == 0) {
                j = System.nanoTime();
            }
            i++;
            applyWait(i);
            if (i % 100 == 0) {
                long nanoTime = System.nanoTime();
                this.eventsSinkBlockingTime.addAndGet(nanoTime - j);
                j = nanoTime;
            }
            Iterator<CanalEventDownStreamHandler> it2 = getHandlers().iterator();
            while (it2.hasNext()) {
                list = (List) it2.next().retry(list);
            }
            if (!this.running || Thread.interrupted()) {
                return false;
            }
        }
        if (i > 0) {
            this.eventsSinkBlockingTime.addAndGet(System.nanoTime() - j);
        }
        Iterator<CanalEventDownStreamHandler> it3 = getHandlers().iterator();
        while (it3.hasNext()) {
            list = (List) it3.next().after(list);
        }
        return true;
    }

    private void applyWait(int i) {
        int i2 = i > maxFullTimes ? maxFullTimes : i;
        if (i <= 3) {
            Thread.yield();
        } else {
            LockSupport.parkNanos(1000000 * i2);
        }
    }

    private String getSchemaNameAndTableName(CanalEntry.Entry entry) {
        return entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName();
    }

    public void setEventStore(CanalEventStore<Event> canalEventStore) {
        this.eventStore = canalEventStore;
    }

    public void setFilterTransactionEntry(boolean z) {
        this.filterTransactionEntry = z;
    }

    public void setFilterEmtryTransactionEntry(boolean z) {
        this.filterEmtryTransactionEntry = z;
    }

    public void setEmptyTransactionInterval(long j) {
        this.emptyTransactionInterval = j;
    }

    public void setEmptyTransctionThresold(long j) {
        this.emptyTransctionThresold = j;
    }

    public AtomicLong getEventsSinkBlockingTime() {
        return this.eventsSinkBlockingTime;
    }
}
