Flink CDC | Mysql指定时间戳读取

createh53个月前 (02-01)技术教程22

Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。

INITIAL: 全量与增量

EARLIEST_OFFSET:最早位点

LATEST_OFFSET:最近的位点

SPECIFIC_OFFSETS:指定位点

TIMESTAMP:指定时间点

SNAPSHOT:全量

源码分析

设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。

public static void main(String[] args) {
    MySqlSource.builder()
    .startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
    .build();
}

当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java

public static BinlogOffset initializeEffectiveOffset(
    BinlogOffset offset, MySqlConnection connection) {
    BinlogOffsetKind offsetKind = offset.getOffsetKind();
    switch (offsetKind) {
        case EARLIEST:
            return BinlogOffset.ofBinlogFilePosition("", 0);
        case TIMESTAMP:
            // 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
            // 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
            return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
        case LATEST:
            return DebeziumUtils.currentBinlogOffset(connection);
        default:
            return offset;
    }
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
        MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
        BinaryLogClient client =
                new BinaryLogClient(
                        config.hostname(), config.port(), config.username(), config.password());

        List binlogFiles = new ArrayList<>();
        JdbcConnection.ResultSetConsumer rsc =
                rs -> {
                    while (rs.next()) {
                        String fileName = rs.getString(1);
                        long fileSize = rs.getLong(2);
                        if (fileSize > 0) {
                            binlogFiles.add(fileName);
                        }
                    }
                };

        try {
            // 获取mysql系统内存在的binlog
            connection.query("SHOW BINARY LOGS", rsc);
            LOG.info("Total search binlog: {}", binlogFiles);

            if (binlogFiles.isEmpty()) {
                return BinlogOffset.ofBinlogFilePosition("", 0);
            }
            // 搜索最接近的binlog文件
            String binlogName = searchBinlogName(client, targetMs, binlogFiles);
            return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
        } catch (Exception e) {
            throw new FlinkRuntimeException(e);
        }
    }
    private static String searchBinlogName(
            BinaryLogClient client, long targetMs, List binlogFiles)
            throws IOException, InterruptedException {
        int startIdx = 0;
        int endIdx = binlogFiles.size() - 1;
        // 因为binlog文件名是递增的,同时时间也是递增的
        // 以二分法进行查找
        while (startIdx <= endIdx) {
            int mid = startIdx + (endIdx - startIdx) / 2;
            long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
            if (midTs < targetMs) {
                startIdx = mid + 1;
            } else if (targetMs < midTs) {
                endIdx = mid - 1;
            } else {
                return binlogFiles.get(mid);
            }
        }

        return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
    }

从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在
MySqlBinlogSplitReadTask.java

protected void handleEvent(
            MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
    // 当从binlog读取数据后,进行一次过滤  
    if (!eventFilter.test(event)) {
            return;
        }
        super.handleEvent(partition, offsetContext, event);
        // check do we need to stop for read binlog for snapshot split.
        if (isBoundedRead()) {
            final BinlogOffset currentBinlogOffset =
                    RecordUtils.getBinlogPosition(offsetContext.getOffset());
            // reach the high watermark, the binlog reader should finished
            if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
                // send binlog end event
                try {
                    signalEventDispatcher.dispatchWatermarkEvent(
                            binlogSplit,
                            currentBinlogOffset,
                            SignalEventDispatcher.WatermarkKind.BINLOG_END);
                } catch (InterruptedException e) {
                    LOG.error("Send signal event error.", e);
                    errorHandler.setProducerThrowable(
                            new DebeziumException("Error processing binlog signal event", e));
                }
                // tell reader the binlog task finished
                ((StoppableChangeEventSourceContext) context).stopChangeEventSource();
            }
        }
    }

eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。

    private Predicate createEventFilter(BinlogOffset startingOffset) {
        // 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
        if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
            long startTimestampSec = startingOffset.getTimestampSec();
            return event ->
                    EventType.HEARTBEAT.equals(event.getHeader().getEventType())
                            || event.getHeader().getTimestamp() >= startTimestampSec * 1000;
        }
        return event -> true;
    }

相关文章

时间戳用法详解,时间与时间戳怎么转换

在程序开发者用到的必不可少的功能就是时间戳与时间的转换了,经常数据库存的是时间戳,但是给用户需要显示具体时间,今天这篇文章就来介绍下怎么使用python,java,JavaScript,php几种语言...

Java8的Stream API确实很牛,但性能究竟如何?

专注于Java领域优质技术,欢迎关注作者:Carpenter LeeStream Performance已经对 Stream API 的用法鼓吹够多了,用起简洁直观,但性能到底怎么样呢?会不会有很高的...

荒废了3年大学时间,Java自学6个多月,找到13k的工作

莫等闲,白了少年头,空悲切。发这个帖子就是劝诫各位学弟们不要像我一样,临近毕业时才意识到学技术学知识的重要性,能趁早尽量趁早,过去应该做的事情没有去做,后面都需要你加倍补回来,如果你不去弥补前面的空缺...

分享一个小技巧——mysql统一处理创建时间和更新时间

背景mysql数据库表设计的时候,通常都会有这两个字段————创建时间和更新时间,创建时间即mysql记录第一次插入的时间,更新时间即mysql记录发生更新时的时间。通常的做法都是先创建对象,然后分别...

漫话:为什么计算机起始时间是1970年1月1日?

作者:漫话编程 来自:漫话编程问题复现1970-01-01对于开发者来说都是不陌生的,有些系统对于时间的处理如果不够好的话,就可能把时间显示成1970-01-01,所以经常有用户看到1970-01-0...

Java面试必备!RabbitMQ 常用知识点总结,纯手绘23张图带你拿下

思维导航:基础为什么使用 MQ?MQ缺点几种 MQ 实现总结完整架构图RabbitMQ 六种工作模式1、Simple 简单模式2、work 工作模式3、publish/subscribe 发布订阅模式...