使用Debezium Engine运行MySQL连接器



Debezium是一个开源分布式数据监控平台,可以持续捕获和流式传输对外部数据库系统所做的实时修改(CDC)。通过CDC操作,Debezium 将外部数据库转换为实时事件流,从而获取和记录在相应数据库应用程序上所做的行级更改。

通常Debezium会配合Kafka Connect集群使用,以确保高级别的容错性和可靠性。对于一般级别的应用来说,使用嵌入式Debezium引擎即可。以下为Debezium MySQL连接器的详细配置及使用。

环境条件

  • Debezium Version: 1.9

  • MySQL Version: 8.0.31

  • JDK Version: 1.8.0.31

官方文档

https://debezium.io/documentation/reference/1.9/index.html

Debezium引擎

https://debezium.io/documentation/reference/1.9/development/engine.html

设置MySQL服务器

创建具有指定权限的用户

CREATE USER 'cdcuser' IDENTIFIED BY 'password';

GRANT SELECT, INSERT, UPDATE, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdcuser'@'%';

FLUSH PRIVILEGES;

注意RELOAD权限用于在快照阶段获取全局锁。 如果只需要增量更新则不需要添加此权限,连接器属性需要配置为:

snapshot.mode=schema_only
snapshot.locking.mode=none

如果需要快照读取阶段(默认),同时避免获取全局锁,连接器属性需要配置为:

snapshot.locking.mode=none

snapshot.locking.mode默认为minimal,会获取全局读锁,配置为none时,只获取行锁(InnoDB)。

官方文档中此处说明为:

防止连接器在快照期间获取任何表锁。虽然所有快照模式都允许使用此设置,但当且仅当快照运行时没有架构更改时才可以安全使用。对于使用MyISAM 引擎定义的表,尽管在MyISAM获取表锁时设置了此属性,但表仍将被锁定。此行为不同于InnoDB 引擎,后者获取行级锁。

一般在数据快照阶段不会做数据库表结构更改,建议设置为none,毕竟全局锁的风险太大,不适合用于线上环境。

binlog配置

确认binlog已开启写入功能,并且binlog_format=ROW

my.cnf配置如下:

log_bin=binlog # 开启 binlog,在MySQL 8.0中默认开启
binlog-format=ROW # 选择 ROW 模式
server_id=1 #不要和连接器中配置的database.server.id重复

Maven依赖配置

<properties>
    <debezium.version>1.9.7.Final</debezium.version>
    <groovy.version>3.0.7</groovy.version>
</properties>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${debezium.version}</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${debezium.version}</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-scripting</artifactId>
    <version>${debezium.version}</version>
</dependency>

<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy</artifactId>
    <version>${groovy.version}</version>
</dependency>

<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy-json</artifactId>
    <version>${groovy.version}</version>
</dependency>

<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy-jsr223</artifactId>
    <version>${groovy.version}</version>
</dependency>

配置和运行MySQL连接器完整示例

DebeziumDemo

import io.debezium.config.Configuration;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.relational.history.KafkaDatabaseHistory;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import test.cdc.DateTimeConverter;

import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class DebeziumDemo {

    public static void main(String[] args) throws InterruptedException {
        Configuration config = Configuration.create().build();
        final Properties props = config.asProperties();

        //配置连接器基本属性
        props.setProperty("name", "app_debezium_engine");
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
        props.setProperty("database.hostname", "localhost");
        props.setProperty("database.port", "3306");
        props.setProperty("database.user", "cdcuser");
        props.setProperty("database.password", "password");
        props.setProperty("database.server.name", "app_debezium_connector_v0.2");
        props.setProperty("database.server.id", "85744");
        props.setProperty("database.include.list", "testdb");
        props.setProperty("database.connectionTimeZone", "Asia/Shanghai");
        props.setProperty("table.include.list", "testdb.test_cdc,testdb.debezium_signal,testdb.debezium_heartbeat");
        props.setProperty("decimal.handling.mode", "string");
        props.setProperty("include.schema.changes", "false");
        props.setProperty("max.batch.size", "2048");
        props.setProperty("max.queue.size", "8192");
        props.setProperty("poll.interval.ms", "1000");
        props.setProperty("connect.timeout.ms", "10000");
        props.setProperty("tombstones.on.delete", "false");

        //数据库连接属性(以database为前缀)
        props.setProperty("database.useSSL", "false");
        props.setProperty("database.allowPublicKeyRetrieval", "true");

        //配置连接器高级属性
        props.setProperty("converters", DateTimeConverter.CONVERTERS_NAME);
        props.setProperty(DateTimeConverter.CONVERTERS_TYPE, DateTimeConverter.class.getCanonicalName());
        props.setProperty("snapshot.mode", "schema_only");
        props.setProperty("snapshot.locking.mode", "none");
        props.setProperty("heartbeat.interval.ms", "10000");
        props.setProperty("heartbeat.action.query", "INSERT INTO testdb.debezium_heartbeat (connector, last_heartbeat) VALUES ('app_debezium_connector_v0.2', NOW()) ON DUPLICATE KEY UPDATE last_heartbeat = now();");
        props.setProperty("signal.data.collection", "testdb.debezium_signal");
        props.setProperty("incremental.snapshot.chunk.size", "1024");

        //配置引擎属性
        props.setProperty("offset.flush.interval.ms", "5000");
        props.setProperty("offset.flush.timeout.ms", "10000");

        //使用本地存储偏移量
        //props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        //props.setProperty("offset.storage.file.filename", "/opt/storage/offsets.dat");

        //使用kafka存储偏移量
        props.setProperty("bootstrap.servers", "kafka-node:9092");
        props.setProperty("offset.storage", KafkaOffsetBackingStore.class.getCanonicalName());
        props.setProperty("offset.storage.topic", "debezium.offset.storage");
        props.setProperty("offset.storage.partitions", "1");
        props.setProperty("offset.storage.replication.factor", "1");

        //配置数据库历史属性
        //使用本地文件存储数据库模式历史
        //props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
        //props.setProperty("database.history.file.filename", "/opt/storage/dbhistory.dat");

        //使用kafka存储数据库模式历史
        props.setProperty("database.history", KafkaDatabaseHistory.class.getCanonicalName());
        props.setProperty("database.history.kafka.topic", "debezium.database.history");
        props.setProperty("database.history.kafka.bootstrap.servers", "kafka-node:9092");

        //从json序列化纪录中排除数据库模式信息
        props.setProperty("key.converter.schemas.enable", "false");
        props.setProperty("value.converter.schemas.enable", "false");

        //日期时间类型转换器自定义属性(属性前缀为转换器名称)
        props.setProperty(DateTimeConverter.FORMATTER_PATTERN_TIME, "HH:mm:ss");
        props.setProperty(DateTimeConverter.FORMATTER_PATTERN_DATE, "yyyy-MM-dd");
        props.setProperty(DateTimeConverter.FORMATTER_PATTERN_DATETIME, "yyyy-MM-dd HH:mm:ss");
        props.setProperty(DateTimeConverter.FORMATTER_PATTERN_TIMESTAMP, "yyyy-MM-dd HH:mm:ss");
        props.setProperty(DateTimeConverter.FORMATTER_PATTERN_TIMESTAMP_ZONID, "Asia/Shanghai");

        //自定义消息过滤配置
        props.setProperty("transforms", "filter");
        props.setProperty("transforms.filter.type", "io.debezium.transforms.Filter");
        //props.setProperty("transforms.filter.topic.regex", "(?!.*(__debezium-heartbeat)).*");
        props.setProperty("transforms.filter.language", "jsr223.groovy");
        props.setProperty("transforms.filter.condition", "!topic.matches('__debezium-heartbeat.(.*)') && value.source.table.matches('(test_cdc)')");

        try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
                .using(props)
                .using((boolean success, String message, Throwable error) -> {
                    if (success) {
                        log.info("[CompletionCallback] \nsuccess => {} \nmessage => {}", true, message);
                        return;
                    }
                    log.error("[CompletionCallback] \nsuccess => {}", false, error);
                })
                .notifying(new ChangeEventConsumer())
                .build()
        ) {
            // 以异步方式运行引擎
            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.execute(engine);
            
            // 设置60s后停止接收新的事件,仅处理已接收的事件
            Thread.sleep(60000);
            executor.shutdown();
            
            // 等待引擎处理剩余事件
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    log.info("The embedded engine cant shut down");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static class ChangeEventConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {

        @Override
        public void handleBatch(List<ChangeEvent<String, String>> list, DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
            for (ChangeEvent<String, String> changeEvent : list) {
                System.out.println("[key]:" + changeEvent.key());
                System.out.println("[value]:" + changeEvent.value());
                System.out.println("[destination]:" + changeEvent.destination());
                
                // calling for each record
                committer.markProcessed(changeEvent);
            }
            
            // calling when this batch is finished
            committer.markBatchFinished();
        }
    }
}

DateTimeConverter

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.Properties;

/**
 * 自定义日期时间类型转换器
 */
@Slf4j
public class DateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
    public final static String CONVERTERS_NAME = "datetime";
    public final static String CONVERTERS_TYPE = CONVERTERS_NAME + ".type";

    public final static String FORMATTER_PATTERN_TIME = buildFullPropertyName("formatter.pattern.time");
    public final static String FORMATTER_PATTERN_DATE = buildFullPropertyName("formatter.pattern.date");
    public final static String FORMATTER_PATTERN_DATETIME = buildFullPropertyName("formatter.pattern.datetime");
    public final static String FORMATTER_PATTERN_TIMESTAMP = buildFullPropertyName("formatter.pattern.timestamp");
    public final static String FORMATTER_PATTERN_TIMESTAMP_ZONID = buildFullPropertyName("formatter.pattern.timestamp.zoneid");

    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
    private DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private ZoneId timestampZoneId = ZoneId.systemDefault();

    private static String buildFullPropertyName(String shortPropertyName) {
        return CONVERTERS_NAME + "." + shortPropertyName;
    }

    private static String buildShortPropertyName(String fullPropertyName) {
        return fullPropertyName.replaceFirst(CONVERTERS_NAME + ".", "");
    }

    @Override
    public void configure(Properties properties) {
        Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_TIME)))
                .ifPresent(f -> timeFormatter = DateTimeFormatter.ofPattern(f));
        Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_DATE)))
                .ifPresent(f -> dateFormatter = DateTimeFormatter.ofPattern(f));
        Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_DATETIME)))
                .ifPresent(f -> dateTimeFormatter = DateTimeFormatter.ofPattern(f));
        Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_TIMESTAMP)))
                .ifPresent(f -> timestampFormatter = DateTimeFormatter.ofPattern(f));
        Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_TIMESTAMP_ZONID)))
                .ifPresent(z -> timestampZoneId = ZoneId.of(z));
    }

    @Override
    public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
        String columnType = relationalColumn.typeName().toUpperCase();
        Converter converter = null;

        switch (columnType) {
            case "TIME":
                converter = (x) -> {
                    if (x instanceof Duration)
                        return LocalTime.ofNanoOfDay(((Duration) x).toNanos()).format(timeFormatter);
                    if (x instanceof String)
                        return LocalTime.ofNanoOfDay(Duration.parse((String) x).toNanos()).format(timeFormatter);

                    log.warn("[DateTimeConverter] unhandled column type({}), current type({})", columnType, x.getClass()
                            .getName());
                    return null;
                };
                break;
            case "DATE":
                converter = (x) -> {
                    if (x instanceof LocalDate)
                        return ((LocalDate) x).format(dateFormatter);
                    if (x instanceof String)
                        return LocalDate.parse((String) x).format(dateFormatter);

                    log.warn("[DateTimeConverter] unhandled column type({}), current type({})", columnType, x.getClass()
                            .getName());
                    return null;
                };
                break;
            case "DATETIME":
                converter = (x) -> {
                    if (x instanceof LocalDateTime)// 增量更新阶段时间类型为LocalDateTime
                        return ((LocalDateTime) x).format(dateTimeFormatter);
                    if (x instanceof Timestamp)// 快照读取阶段时间类型为Timestamp
                        return ((Timestamp) x).toLocalDateTime().format(dateTimeFormatter);
                    if (x instanceof String)
                        return LocalDateTime.parse((String) x).format(dateTimeFormatter);

                    log.warn("[DateTimeConverter] unhandled column type({}), current type({})", columnType, x.getClass()
                            .getName());
                    return null;
                };
                break;
            case "TIMESTAMP":
                converter = (x) -> {
                    if (x instanceof ZonedDateTime)// 增量更新阶段时间类型为ZonedDateTime, 需指定时区
                        return ((ZonedDateTime) x).withZoneSameInstant(timestampZoneId).format(timestampFormatter);
                    if (x instanceof Timestamp)// 快照读取阶段时间类型为Timestamp, 使用数据库服务器默认时区
                        return ((Timestamp) x).toLocalDateTime().format(timestampFormatter);
                    if (x instanceof String)
                        return ZonedDateTime.parse((String) x).withZoneSameInstant(timestampZoneId)
                                .format(timestampFormatter);

                    log.warn("[DateTimeConverter] unhandled column type({}), current type({})", columnType, x.getClass()
                            .getName());
                    return null;
                };
        }

        if (null != converter) {
            converterRegistration.register(SchemaBuilder.string(), converter);
            log.info("register converter for columnType {} to schema {}", columnType, SchemaBuilder.string().type());
        }
    }
}

常见问题及特殊配置

数据库结构更改

如果正在捕获的数据库表发生结构变更,则从Debezium发出的更改事件也会实时生效。例如在要捕获的表中新增字段,在Debezium变更事件消息中,新字段均会出现在事件前后的状态中。

为连接器捕获名单添加新表

为现有的连接器添加一个新表进行cdc操作时,无论连接器是否重启,均不会生效。

官方论坛关于此问题的讨论:

https://groups.google.com/g/debezium/c/Iw4BgLZ8Mq4

查阅官方文档可知,database.server.name属性是为Debezium捕获特定数据的数据库提供的一个逻辑名称。可以推测这个属性和正在捕获的表是对应的。因此对于后续在捕获名单中添加新表的情况,可以在属性值的后缀指定一个版本号,如app_debezium_connector_v0.1,依次往后递增。

MySQL清除binlog文件的情况

如果连接器停止时间过长时,MySQL服务器会清除旧的二进制日志文件,连接器纪录的最后位置可能会丢失。当连接器重新启动时可能会导致错误。需要修改连接器配置属性database.server.name才可以正常启动。推荐在mysql配置文件中,将binlog失效时间expire_logs_days调大,以避免此类问题。

时区问题

在上述代码示例中配置了database.connectionTimeZone属性为Asia/Shanghai

在快照阶段timestamp类型值会使用该属性配置的时区进行转换,若未配置,则debezium会查询MySQL服务器默认时区。

在增量更新阶段,timestamp类型值默认为UTC时间,需要手动进行转换,此属性在实际测试中配置无效。

添加自定义日期时间类型转换器

debezium默认获取到的时间为时间戳数字或者UTC时间,如果需要获取指定时间格式和时区的数据,需要配置自定义转换器。

配置示例如下:

props.setProperty("converters", "datetime");
props.setProperty("datetime.type",DateTimeConverter.class.getCanonicalName());

其中DateTimeConverter为自定义转换器实现类。

在上述代码示例中,已经配置了一个转换器datetime。通过在datetime下添加自定义属性,例如时间格式和时区,可以在DateTimeConverter转换器实现类下的configure方法中进行引用。需要说明的是在配置连接器属性时必须添加转换器的名称作为前缀,而在configure方法中获取对应属性时,则需要从属性名称中移除前缀,否则无法获取对应的值。

配置示例如下:

props.setProperty("datetime.formatter.pattern.time", "HH:mm:ss");
props.setProperty("datetime.formatter.pattern.date", "yyyy-MM-dd");
props.setProperty("datetime.formatter.pattern.datetime", "yyyy-MM-dd HH:mm:ss");
props.setProperty("datetime.formatter.pattern.timestamp", "yyyy-MM-dd HH:mm:ss");
props.setProperty("datetime.formatter.pattern.timestamp.zoneid", "Asia/Shanghai");

关于配置和使用转换器,可以参考:

https://debezium.io/documentation/reference/stable/development/converters.html#configuring-and-using-converters

decimal类型数据转换

对于decimal类型数据,在默认精度下会显示为"F3A=",通过设置属性decimal.handling.mode为double或string即可解决。

关于快照读取和增量更新

默认情况下,连接器在每次启动时都会首先获取一个全局读锁,然后进行快照读取。 首先获取全局读锁是不推荐的,可以设置snapshot.locking.mode=none使用行锁来代替。

每次启动时,如果当前连接器的逻辑服务器名称未纪录偏移量,那么就会开始进行快照读取,若已存在偏移量则跳过快照阶段。

如果需要更加灵活的快照读取操作,建议使用模式快照+数据临时增量快照。 其中模式快照配置为:

snapshot.mode=schema_only

schema_only模式下,连接器只运行数据库结构的快照,而不进行表数据的快照,并且只捕获从连接器启动以后的最新更改事件。也就是从本地存储的偏移量来开始读取。

对于数据临时增量快照,详细过程可以参考:

https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-ad-hoc-snapshots

具体实现方式可以参考后文。

向Debezium连接器发送信号实现临时增量快照

具体做法是添加一个信号表,用于向Debezium发送信号请求来启动临时快照(目前仅支持触发增量快照)。

在本例中创建信号表为debezium_signal,需要包含id, type, data这三个字段,具体的字段含义可以参考:

https://debezium.io/documentation/reference/1.9/configuration/signalling.html#debezium-signaling-data-collection-structure

添加如下配置:

props.setProperty("table.include.list", "testdb.test_cdc,testdb.debezium_signal");
props.setProperty("signal.data.collection", "testdb.debezium_signal");

其中testdb.test_cdc是要进行数据更改捕获的表,testdb.debezium_signal为创建的信号数据收集表,用于向连接器发送信号。

指定一个信号实例snapshot-001,新增一条信号数据如下:

INSERT INTO debezium_signal (id, type, data) VALUES ('snapshot-001', 'execute-snapshot', '{\"data-collections\": [\"testdb.test_cdc\"]}');

连接器收到此信号后,会启动一个临时快照。

消息恢复处理

当引擎运行时,连接器会主动记录每个源记录中的源偏移量,并且引擎会定期将这些偏移量刷新到持久存储中。当应用程序和引擎正常关闭或崩溃后,将在重新启动时从最后记录的offset恢复读取源信息。

与连接器刷新偏移量相关的配置属性为:

  • offset.flush.interval.ms

    连接器提交偏移量的时间间隔,默认60s

  • offset.flush.timeout.ms

    等待纪录刷新和偏移量数据提交到存储的超时时间,默认5s

由于并不总是能够记录最后位置/偏移量。当重新启动时,在恢复期间可能会重复收到已经处理过的源记录。

此外,网络故障也可能会导致Debezium连接器无法收到写入确认,从而导致同一事件被记录多次(直到收到确认)。因此需要对消息做重复处理。

消息重复处理

每个更改事件消息都包含特定于源的信息source,用于描述事件源元数据,其中包含的信息可用于将此事件与其他事件进行比较,包括事件的起源、事件发生的顺序以及事件是否属于同一事务。

在更改事件消息体中,对应的json结构为:

"source": {
    "version": "1.9.7.Final", //Debezium 版本
    "connector": "mysql", //连接器类型
    "name": "app_debezium_connector_v0.1", //连接器名称
    "ts_ms": 1670234745000, //在数据库中进行更改时的时间戳
    "snapshot": "false",//事件是否为快照的一部分(true, false, incremental)
    "db": "testdb", //包含事件行的数据库名称
    "sequence": null,
    "table": "test_cdc", //包含事件行的表名称
    "server_id": 1, //MySQL服务器ID(如果可用)
    "gtid": null, //全局事务ID(如果可用)
    "file": "binlog.000004", //记录事件的binlog名称
    "pos": 34890, //二进制日志位置
    "row": 0, //事件中的行
    "thread": 106, //创建事件的MySQL线程的ID(仅限非快照)
    "query": null
}

可以使用以下这些信息来识别重复事件,也可用于在消费端做幂等。

  • 事件起源

  • MySQL 服务器的事件时间

  • binlog文件名和位置

  • (GTID)(如果可用)

消息过滤配置(SMT)

默认情况下,Debezium连接器将它接收到的每个数据更改事件传递到消费端。 可以通过自定义SMT来添加过滤逻辑。

过滤器SMT支持与JSR 223集成的脚本语言,要在Debezium中使用表达式语言,必须添加该语言的JSR 223脚本引擎实现。

以Groovy 3脚本引擎为例,添加Maven依赖:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-scripting</artifactId>
    <version>${debezium.version}</version>
</dependency>

<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy</artifactId>
    <version>${groovy.version}</version>
</dependency>

<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy-json</artifactId>
    <version>${groovy.version}</version>
</dependency>

<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy-jsr223</artifactId>
    <version>${groovy.version}</version>
</dependency>

过滤器配置属性配置示例:

props.setProperty("transforms", "filter");
props.setProperty("transforms.filter.type", "io.debezium.transforms.Filter");
props.setProperty("transforms.filter.language", "jsr223.groovy");
props.setProperty("transforms.filter.condition", "!topic.matches('__debezium-heartbeat.(.*)') && value.source.table.matches('(test_cdc)')");

其中transforms.filter.condition属性表示过滤条件表达式,计算结果为true时保留,为false时则删除消息。 示例中的表达式为groovy 3脚本,作用是只保留主题名称中不包含“__debezium-heartbeat.”字符串,并且事件行对应表必须为test_cdc的消息。

在条件表达式中单独通过value.source来匹配字段时,需要注意在消息中source字段必须是存在的,否则会抛出异常。例如心跳消息中是不含source字段的。所以还要结合主题名称来进行过滤。

如果对于某些事件不想应用过滤器,可以通过topic.regex属性配置正则表达式,用于评估事件的目标主题名称以确定是否应用过滤逻辑,如果不匹配则不会应用过滤器逻辑,而是原封不动的传递给目标主题。

过滤器主题正则匹配配置示例:

props.setProperty("transforms.filter.topic.regex", "(?!.*(__debezium-heartbeat)).*");

作用是对于所有目标主题包含“__debezium-heartbeat”的事件不通过过滤器处理,而是直接进行传递。

关于消息过滤器的配置和使用,可以参考:

https://debezium.io/documentation/reference/1.9/transformations/filtering.html

关闭墓碑消息(tombstone message)

当执行删除操作时,debezium会生成两个事件

  1. 具有仅包含旧数据的纪录,操作类型为‘d’
  2. 具有null值和相同键的记录(墓碑消息)

墓碑消息作为Kafka的标记,主要作用是删除源记录后,发出墓碑事件(默认行为)允许Kafka完全删除与已删除行的键相关的所有事件,以防为主题启用日志压缩。

消息过滤器不会处理墓碑消息,如果不需要处理墓碑消息,可以配置连接器属性为:

tombstones.on.delete=false

心跳消息(heartbeat event)

心跳消息在不同连接器中的实现方式和作用不尽相同。发送心跳消息的作用是避免连接器存储的偏移量数据过于陈旧,从而减少连接器重新启动时需要重新发送的更改事件的数量。

其中heartbeat.interval.ms属性用于配置发送心跳消息的频率。在MySQL连接器中,配置此参数并不会真正按照此周期定期发送消息,而是在处理了更改事件后,检查heartbeat的间隔计时器是否过期,如果过期则发送心跳消息。同时heartbeat的发送间隔也并不是固定的,而是半规则间隔(semi-regular interval),也就是说此参数配置的是间隔时间的最小值。

heartbeat.action.query属性用于配置发送心跳消息时在源数据库执行的语句,在示例中,创建了一个debezium_heartbeat心跳表,字段connector为连接器名称,作为主键,last_heartbeat为最近一次的心跳时间。

示例代码中用于发送心跳时执行的语句为:

INSERT INTO testdb.debezium_heartbeat (connector, last_heartbeat) VALUES ('app_debezium_connector_v0.2', NOW()) ON DUPLICATE KEY UPDATE last_heartbeat = now();

始终仅插入或更新唯一一条心跳信息。

关于心跳消息的机制,可以参考官方论坛的部分讨论:

https://groups.google.com/g/debezium/c/JFXqYFvFIns/m/SxWF9OXCAQAJ

数据更改事件对象(ChangeEvent)

ChangeEvent有key, value, destination三个属性。

其中key为事件键,通常为事件纪录来源表的主键。value为事件值,表示事件纪录的内容,包含事件来源(source)、数据变更前的纪录(before)、数据变更后的纪录(after)。destination表示当前事件预期要发送的目的地名称,由连接器名称、纪录来源数据库名、纪录来源表名组成,如果变更纪录发送到Kafka,那么destination表示要发送的目标主题。

变更事件纪录的类型为:更新(UPDATE)、新增(CREATE)、删除(DELETE)。其中key、destination在对应事件中都是相同的。
keydestination
{"id":4}app_debezium_connector_v0.2.testdb.test_cdc

下面是各数据变更事件纪录value的内容示例:

  • 更新事件
  {
    "before": {
        "id": 4,
        "test_text": "254",
        "test_double": "11.09",
        "test_time": "19:01:01",
        "test_datetime": "2022-12-07 19:01:03",
        "test_timestamp": "2022-12-15 15:18:30"
    },
    "after": {
        "id": 4,
        "test_text": "255",
        "test_double": "11.09",
        "test_time": "19:01:01",
        "test_datetime": "2022-12-07 19:01:03",
        "test_timestamp": "2022-12-15 15:20:02"
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "app_debezium_connector_v0.2",
        "ts_ms": 1671088802000,
        "snapshot": "false",
        "db": "testdb",
        "sequence": null,
        "table": "test_cdc",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000005",
        "pos": 27953,
        "row": 0,
        "thread": 3450,
        "query": null
    },
    "op": "u",
    "ts_ms": 1671088802985,
    "transaction": null
}
  • 新增事件
  {
    "before": null,
    "after": {
        "id": 4,
        "test_text": "255",
        "test_double": "11.09",
        "test_time": "19:01:01",
        "test_datetime": "2022-12-07 19:01:03",
        "test_timestamp": "2022-12-15 15:20:02"
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "app_debezium_connector_v0.2",
        "ts_ms": 1671088829000,
        "snapshot": "false",
        "db": "testdb",
        "sequence": null,
        "table": "test_cdc",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000005",
        "pos": 29354,
        "row": 0,
        "thread": 3450,
        "query": null
    },
    "op": "c",
    "ts_ms": 1671088829475,
    "transaction": null
}
  • 删除事件
  {
    "before": {
        "id": 4,
        "test_text": "255",
        "test_double": "11.09",
        "test_time": "19:01:01",
        "test_datetime": "2022-12-07 19:01:03",
        "test_timestamp": "2022-12-15 15:20:02"
    },
    "after": null,
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "app_debezium_connector_v0.2",
        "ts_ms": 1671088824000,
        "snapshot": "false",
        "db": "testdb",
        "sequence": null,
        "table": "test_cdc",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000005",
        "pos": 28662,
        "row": 0,
        "thread": 3450,
        "query": null
    },
    "op": "d",
    "ts_ms": 1671088824425,
    "transaction": null
}

Debezium连接器、引擎属性及使用说明文档

Debezium文档较为分散,以下是常见属性及功能配置的文档链接: