Debezium是一个开源分布式数据监控平台,可以持续捕获和流式传输对外部数据库系统所做的实时修改(CDC)。通过CDC操作,Debezium 将外部数据库转换为实时事件流,从而获取和记录在相应数据库应用程序上所做的行级更改。
通常Debezium会配合Kafka Connect集群使用,以确保高级别的容错性和可靠性。对于一般级别的应用来说,使用嵌入式Debezium引擎即可。以下为Debezium MySQL连接器的详细配置及使用。
Debezium Version: 1.9
MySQL Version: 8.0.31
JDK Version: 1.8.0.31
https://debezium.io/documentation/reference/1.9/index.html
https://debezium.io/documentation/reference/1.9/development/engine.html
CREATE USER 'cdcuser' IDENTIFIED BY 'password';
GRANT SELECT, INSERT, UPDATE, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdcuser'@'%';
FLUSH PRIVILEGES;
注意RELOAD权限用于在快照阶段获取全局锁。 如果只需要增量更新则不需要添加此权限,连接器属性需要配置为:
snapshot.mode=schema_only
snapshot.locking.mode=none
如果需要快照读取阶段(默认),同时避免获取全局锁,连接器属性需要配置为:
snapshot.locking.mode=none
snapshot.locking.mode默认为minimal,会获取全局读锁,配置为none时,只获取行锁(InnoDB)。
官方文档中此处说明为:
防止连接器在快照期间获取任何表锁。虽然所有快照模式都允许使用此设置,但当且仅当快照运行时没有架构更改时才可以安全使用。对于使用MyISAM 引擎定义的表,尽管在MyISAM获取表锁时设置了此属性,但表仍将被锁定。此行为不同于InnoDB 引擎,后者获取行级锁。
一般在数据快照阶段不会做数据库表结构更改,建议设置为none,毕竟全局锁的风险太大,不适合用于线上环境。
确认binlog已开启写入功能,并且binlog_format=ROW
my.cnf配置如下:
log_bin=binlog # 开启 binlog,在MySQL 8.0中默认开启
binlog-format=ROW # 选择 ROW 模式
server_id=1 #不要和连接器中配置的database.server.id重复
<properties>
<debezium.version>1.9.7.Final</debezium.version>
<groovy.version>3.0.7</groovy.version>
</properties>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-scripting</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>${groovy.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>${groovy.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-jsr223</artifactId>
<version>${groovy.version}</version>
</dependency>
DebeziumDemo
import io.debezium.config.Configuration;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.relational.history.KafkaDatabaseHistory;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import test.cdc.DateTimeConverter;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class DebeziumDemo {
public static void main(String[] args) throws InterruptedException {
Configuration config = Configuration.create().build();
final Properties props = config.asProperties();
//配置连接器基本属性
props.setProperty("name", "app_debezium_engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "cdcuser");
props.setProperty("database.password", "password");
props.setProperty("database.server.name", "app_debezium_connector_v0.2");
props.setProperty("database.server.id", "85744");
props.setProperty("database.include.list", "testdb");
props.setProperty("database.connectionTimeZone", "Asia/Shanghai");
props.setProperty("table.include.list", "testdb.test_cdc,testdb.debezium_signal,testdb.debezium_heartbeat");
props.setProperty("decimal.handling.mode", "string");
props.setProperty("include.schema.changes", "false");
props.setProperty("max.batch.size", "2048");
props.setProperty("max.queue.size", "8192");
props.setProperty("poll.interval.ms", "1000");
props.setProperty("connect.timeout.ms", "10000");
props.setProperty("tombstones.on.delete", "false");
//数据库连接属性(以database为前缀)
props.setProperty("database.useSSL", "false");
props.setProperty("database.allowPublicKeyRetrieval", "true");
//配置连接器高级属性
props.setProperty("converters", DateTimeConverter.CONVERTERS_NAME);
props.setProperty(DateTimeConverter.CONVERTERS_TYPE, DateTimeConverter.class.getCanonicalName());
props.setProperty("snapshot.mode", "schema_only");
props.setProperty("snapshot.locking.mode", "none");
props.setProperty("heartbeat.interval.ms", "10000");
props.setProperty("heartbeat.action.query", "INSERT INTO testdb.debezium_heartbeat (connector, last_heartbeat) VALUES ('app_debezium_connector_v0.2', NOW()) ON DUPLICATE KEY UPDATE last_heartbeat = now();");
props.setProperty("signal.data.collection", "testdb.debezium_signal");
props.setProperty("incremental.snapshot.chunk.size", "1024");
//配置引擎属性
props.setProperty("offset.flush.interval.ms", "5000");
props.setProperty("offset.flush.timeout.ms", "10000");
//使用本地存储偏移量
//props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
//props.setProperty("offset.storage.file.filename", "/opt/storage/offsets.dat");
//使用kafka存储偏移量
props.setProperty("bootstrap.servers", "kafka-node:9092");
props.setProperty("offset.storage", KafkaOffsetBackingStore.class.getCanonicalName());
props.setProperty("offset.storage.topic", "debezium.offset.storage");
props.setProperty("offset.storage.partitions", "1");
props.setProperty("offset.storage.replication.factor", "1");
//配置数据库历史属性
//使用本地文件存储数据库模式历史
//props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
//props.setProperty("database.history.file.filename", "/opt/storage/dbhistory.dat");
//使用kafka存储数据库模式历史
props.setProperty("database.history", KafkaDatabaseHistory.class.getCanonicalName());
props.setProperty("database.history.kafka.topic", "debezium.database.history");
props.setProperty("database.history.kafka.bootstrap.servers", "kafka-node:9092");
//从json序列化纪录中排除数据库模式信息
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");
//日期时间类型转换器自定义属性(属性前缀为转换器名称)
props.setProperty(DateTimeConverter.FORMATTER_PATTERN_TIME, "HH:mm:ss");
props.setProperty(DateTimeConverter.FORMATTER_PATTERN_DATE, "yyyy-MM-dd");
props.setProperty(DateTimeConverter.FORMATTER_PATTERN_DATETIME, "yyyy-MM-dd HH:mm:ss");
props.setProperty(DateTimeConverter.FORMATTER_PATTERN_TIMESTAMP, "yyyy-MM-dd HH:mm:ss");
props.setProperty(DateTimeConverter.FORMATTER_PATTERN_TIMESTAMP_ZONID, "Asia/Shanghai");
//自定义消息过滤配置
props.setProperty("transforms", "filter");
props.setProperty("transforms.filter.type", "io.debezium.transforms.Filter");
//props.setProperty("transforms.filter.topic.regex", "(?!.*(__debezium-heartbeat)).*");
props.setProperty("transforms.filter.language", "jsr223.groovy");
props.setProperty("transforms.filter.condition", "!topic.matches('__debezium-heartbeat.(.*)') && value.source.table.matches('(test_cdc)')");
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(props)
.using((boolean success, String message, Throwable error) -> {
if (success) {
log.info("[CompletionCallback] \nsuccess => {} \nmessage => {}", true, message);
return;
}
log.error("[CompletionCallback] \nsuccess => {}", false, error);
})
.notifying(new ChangeEventConsumer())
.build()
) {
// 以异步方式运行引擎
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
// 设置60s后停止接收新的事件,仅处理已接收的事件
Thread.sleep(60000);
executor.shutdown();
// 等待引擎处理剩余事件
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
log.info("The embedded engine cant shut down");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static class ChangeEventConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
@Override
public void handleBatch(List<ChangeEvent<String, String>> list, DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
for (ChangeEvent<String, String> changeEvent : list) {
System.out.println("[key]:" + changeEvent.key());
System.out.println("[value]:" + changeEvent.value());
System.out.println("[destination]:" + changeEvent.destination());
// calling for each record
committer.markProcessed(changeEvent);
}
// calling when this batch is finished
committer.markBatchFinished();
}
}
}
DateTimeConverter
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.Properties;
/**
* 自定义日期时间类型转换器
*/
@Slf4j
public class DateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
public final static String CONVERTERS_NAME = "datetime";
public final static String CONVERTERS_TYPE = CONVERTERS_NAME + ".type";
public final static String FORMATTER_PATTERN_TIME = buildFullPropertyName("formatter.pattern.time");
public final static String FORMATTER_PATTERN_DATE = buildFullPropertyName("formatter.pattern.date");
public final static String FORMATTER_PATTERN_DATETIME = buildFullPropertyName("formatter.pattern.datetime");
public final static String FORMATTER_PATTERN_TIMESTAMP = buildFullPropertyName("formatter.pattern.timestamp");
public final static String FORMATTER_PATTERN_TIMESTAMP_ZONID = buildFullPropertyName("formatter.pattern.timestamp.zoneid");
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
private static String buildFullPropertyName(String shortPropertyName) {
return CONVERTERS_NAME + "." + shortPropertyName;
}
private static String buildShortPropertyName(String fullPropertyName) {
return fullPropertyName.replaceFirst(CONVERTERS_NAME + ".", "");
}
@Override
public void configure(Properties properties) {
Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_TIME)))
.ifPresent(f -> timeFormatter = DateTimeFormatter.ofPattern(f));
Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_DATE)))
.ifPresent(f -> dateFormatter = DateTimeFormatter.ofPattern(f));
Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_DATETIME)))
.ifPresent(f -> dateTimeFormatter = DateTimeFormatter.ofPattern(f));
Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_TIMESTAMP)))
.ifPresent(f -> timestampFormatter = DateTimeFormatter.ofPattern(f));
Optional.ofNullable(properties.getProperty(buildShortPropertyName(FORMATTER_PATTERN_TIMESTAMP_ZONID)))
.ifPresent(z -> timestampZoneId = ZoneId.of(z));
}
@Override
public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
String columnType = relationalColumn.typeName().toUpperCase();
Converter converter = null;
switch (columnType) {
case "TIME":
converter = (x) -> {
if (x instanceof Duration)
return LocalTime.ofNanoOfDay(((Duration) x).toNanos()).format(timeFormatter);
if (x instanceof String)
return LocalTime.ofNanoOfDay(Duration.parse((String) x).toNanos()).format(timeFormatter);
log.warn("[DateTimeConverter] unhandled column type({}), current type({})", columnType, x.getClass()
.getName());
return null;
};
break;
case "DATE":
converter = (x) -> {
if (x instanceof LocalDate)
return ((LocalDate) x).format(dateFormatter);
if (x instanceof String)
return LocalDate.parse((String) x).format(dateFormatter);
log.warn("[DateTimeConverter] unhandled column type({}), current type({})", columnType, x.getClass()
.getName());
return null;
};
break;
case "DATETIME":
converter = (x) -> {
if (x instanceof LocalDateTime)// 增量更新阶段时间类型为LocalDateTime
return ((LocalDateTime) x).format(dateTimeFormatter);
if (x instanceof Timestamp)// 快照读取阶段时间类型为Timestamp
return ((Timestamp) x).toLocalDateTime().format(dateTimeFormatter);
if (x instanceof String)
return LocalDateTime.parse((String) x).format(dateTimeFormatter);
log.warn("[DateTimeConverter] unhandled column type({}), current type({})", columnType, x.getClass()
.getName());
return null;
};
break;
case "TIMESTAMP":
converter = (x) -> {
if (x instanceof ZonedDateTime)// 增量更新阶段时间类型为ZonedDateTime, 需指定时区
return ((ZonedDateTime) x).withZoneSameInstant(timestampZoneId).format(timestampFormatter);
if (x instanceof Timestamp)// 快照读取阶段时间类型为Timestamp, 使用数据库服务器默认时区
return ((Timestamp) x).toLocalDateTime().format(timestampFormatter);
if (x instanceof String)
return ZonedDateTime.parse((String) x).withZoneSameInstant(timestampZoneId)
.format(timestampFormatter);
log.warn("[DateTimeConverter] unhandled column type({}), current type({})", columnType, x.getClass()
.getName());
return null;
};
}
if (null != converter) {
converterRegistration.register(SchemaBuilder.string(), converter);
log.info("register converter for columnType {} to schema {}", columnType, SchemaBuilder.string().type());
}
}
}
如果正在捕获的数据库表发生结构变更,则从Debezium发出的更改事件也会实时生效。例如在要捕获的表中新增字段,在Debezium变更事件消息中,新字段均会出现在事件前后的状态中。
为现有的连接器添加一个新表进行cdc操作时,无论连接器是否重启,均不会生效。
官方论坛关于此问题的讨论:
查阅官方文档可知,database.server.name
属性是为Debezium捕获特定数据的数据库提供的一个逻辑名称。可以推测这个属性和正在捕获的表是对应的。因此对于后续在捕获名单中添加新表的情况,可以在属性值的后缀指定一个版本号,如app_debezium_connector_v0.1,依次往后递增。
如果连接器停止时间过长时,MySQL服务器会清除旧的二进制日志文件,连接器纪录的最后位置可能会丢失。当连接器重新启动时可能会导致错误。需要修改连接器配置属性database.server.name
才可以正常启动。推荐在mysql配置文件中,将binlog失效时间expire_logs_days
调大,以避免此类问题。
在上述代码示例中配置了database.connectionTimeZone
属性为Asia/Shanghai
。
在快照阶段timestamp类型值会使用该属性配置的时区进行转换,若未配置,则debezium会查询MySQL服务器默认时区。
在增量更新阶段,timestamp类型值默认为UTC时间,需要手动进行转换,此属性在实际测试中配置无效。
debezium默认获取到的时间为时间戳数字或者UTC时间,如果需要获取指定时间格式和时区的数据,需要配置自定义转换器。
配置示例如下:
props.setProperty("converters", "datetime");
props.setProperty("datetime.type",DateTimeConverter.class.getCanonicalName());
其中DateTimeConverter为自定义转换器实现类。
在上述代码示例中,已经配置了一个转换器datetime。通过在datetime下添加自定义属性,例如时间格式和时区,可以在DateTimeConverter转换器实现类下的configure方法中进行引用。需要说明的是在配置连接器属性时必须添加转换器的名称作为前缀,而在configure方法中获取对应属性时,则需要从属性名称中移除前缀,否则无法获取对应的值。
配置示例如下:
props.setProperty("datetime.formatter.pattern.time", "HH:mm:ss");
props.setProperty("datetime.formatter.pattern.date", "yyyy-MM-dd");
props.setProperty("datetime.formatter.pattern.datetime", "yyyy-MM-dd HH:mm:ss");
props.setProperty("datetime.formatter.pattern.timestamp", "yyyy-MM-dd HH:mm:ss");
props.setProperty("datetime.formatter.pattern.timestamp.zoneid", "Asia/Shanghai");
关于配置和使用转换器,可以参考:
对于decimal类型数据,在默认精度下会显示为"F3A=",通过设置属性decimal.handling.mode
为double或string即可解决。
默认情况下,连接器在每次启动时都会首先获取一个全局读锁,然后进行快照读取。 首先获取全局读锁是不推荐的,可以设置snapshot.locking.mode=none
使用行锁来代替。
每次启动时,如果当前连接器的逻辑服务器名称未纪录偏移量,那么就会开始进行快照读取,若已存在偏移量则跳过快照阶段。
如果需要更加灵活的快照读取操作,建议使用模式快照+数据临时增量快照。 其中模式快照配置为:
snapshot.mode=schema_only
在schema_only
模式下,连接器只运行数据库结构的快照,而不进行表数据的快照,并且只捕获从连接器启动以后的最新更改事件。也就是从本地存储的偏移量来开始读取。
对于数据临时增量快照,详细过程可以参考:
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-ad-hoc-snapshots
具体实现方式可以参考后文。
具体做法是添加一个信号表,用于向Debezium发送信号请求来启动临时快照(目前仅支持触发增量快照)。
在本例中创建信号表为debezium_signal,需要包含id, type, data这三个字段,具体的字段含义可以参考:
添加如下配置:
props.setProperty("table.include.list", "testdb.test_cdc,testdb.debezium_signal");
props.setProperty("signal.data.collection", "testdb.debezium_signal");
其中testdb.test_cdc是要进行数据更改捕获的表,testdb.debezium_signal为创建的信号数据收集表,用于向连接器发送信号。
指定一个信号实例snapshot-001,新增一条信号数据如下:
INSERT INTO debezium_signal (id, type, data) VALUES ('snapshot-001', 'execute-snapshot', '{\"data-collections\": [\"testdb.test_cdc\"]}');
连接器收到此信号后,会启动一个临时快照。
当引擎运行时,连接器会主动记录每个源记录中的源偏移量,并且引擎会定期将这些偏移量刷新到持久存储中。当应用程序和引擎正常关闭或崩溃后,将在重新启动时从最后记录的offset恢复读取源信息。
与连接器刷新偏移量相关的配置属性为:
offset.flush.interval.ms
连接器提交偏移量的时间间隔,默认60s
offset.flush.timeout.ms
等待纪录刷新和偏移量数据提交到存储的超时时间,默认5s
由于并不总是能够记录最后位置/偏移量。当重新启动时,在恢复期间可能会重复收到已经处理过的源记录。
此外,网络故障也可能会导致Debezium连接器无法收到写入确认,从而导致同一事件被记录多次(直到收到确认)。因此需要对消息做重复处理。
每个更改事件消息都包含特定于源的信息source
,用于描述事件源元数据,其中包含的信息可用于将此事件与其他事件进行比较,包括事件的起源、事件发生的顺序以及事件是否属于同一事务。
在更改事件消息体中,对应的json结构为:
"source": {
"version": "1.9.7.Final", //Debezium 版本
"connector": "mysql", //连接器类型
"name": "app_debezium_connector_v0.1", //连接器名称
"ts_ms": 1670234745000, //在数据库中进行更改时的时间戳
"snapshot": "false",//事件是否为快照的一部分(true, false, incremental)
"db": "testdb", //包含事件行的数据库名称
"sequence": null,
"table": "test_cdc", //包含事件行的表名称
"server_id": 1, //MySQL服务器ID(如果可用)
"gtid": null, //全局事务ID(如果可用)
"file": "binlog.000004", //记录事件的binlog名称
"pos": 34890, //二进制日志位置
"row": 0, //事件中的行
"thread": 106, //创建事件的MySQL线程的ID(仅限非快照)
"query": null
}
可以使用以下这些信息来识别重复事件,也可用于在消费端做幂等。
事件起源
MySQL 服务器的事件时间
binlog文件名和位置
(GTID)(如果可用)
默认情况下,Debezium连接器将它接收到的每个数据更改事件传递到消费端。 可以通过自定义SMT来添加过滤逻辑。
过滤器SMT支持与JSR 223集成的脚本语言,要在Debezium中使用表达式语言,必须添加该语言的JSR 223脚本引擎实现。
以Groovy 3脚本引擎为例,添加Maven依赖:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-scripting</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>${groovy.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>${groovy.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-jsr223</artifactId>
<version>${groovy.version}</version>
</dependency>
过滤器配置属性配置示例:
props.setProperty("transforms", "filter");
props.setProperty("transforms.filter.type", "io.debezium.transforms.Filter");
props.setProperty("transforms.filter.language", "jsr223.groovy");
props.setProperty("transforms.filter.condition", "!topic.matches('__debezium-heartbeat.(.*)') && value.source.table.matches('(test_cdc)')");
其中transforms.filter.condition
属性表示过滤条件表达式,计算结果为true时保留,为false时则删除消息。 示例中的表达式为groovy 3脚本,作用是只保留主题名称中不包含“__debezium-heartbeat.”字符串,并且事件行对应表必须为test_cdc的消息。
在条件表达式中单独通过value.source
来匹配字段时,需要注意在消息中source
字段必须是存在的,否则会抛出异常。例如心跳消息中是不含source
字段的。所以还要结合主题名称来进行过滤。
如果对于某些事件不想应用过滤器,可以通过topic.regex
属性配置正则表达式,用于评估事件的目标主题名称以确定是否应用过滤逻辑,如果不匹配则不会应用过滤器逻辑,而是原封不动的传递给目标主题。
过滤器主题正则匹配配置示例:
props.setProperty("transforms.filter.topic.regex", "(?!.*(__debezium-heartbeat)).*");
作用是对于所有目标主题包含“__debezium-heartbeat”的事件不通过过滤器处理,而是直接进行传递。
关于消息过滤器的配置和使用,可以参考:
https://debezium.io/documentation/reference/1.9/transformations/filtering.html
当执行删除操作时,debezium会生成两个事件
墓碑消息作为Kafka的标记,主要作用是删除源记录后,发出墓碑事件(默认行为)允许Kafka完全删除与已删除行的键相关的所有事件,以防为主题启用日志压缩。
消息过滤器不会处理墓碑消息,如果不需要处理墓碑消息,可以配置连接器属性为:
tombstones.on.delete=false
心跳消息在不同连接器中的实现方式和作用不尽相同。发送心跳消息的作用是避免连接器存储的偏移量数据过于陈旧,从而减少连接器重新启动时需要重新发送的更改事件的数量。
其中heartbeat.interval.ms
属性用于配置发送心跳消息的频率。在MySQL连接器中,配置此参数并不会真正按照此周期定期发送消息,而是在处理了更改事件后,检查heartbeat的间隔计时器是否过期,如果过期则发送心跳消息。同时heartbeat的发送间隔也并不是固定的,而是半规则间隔(semi-regular interval),也就是说此参数配置的是间隔时间的最小值。
heartbeat.action.query
属性用于配置发送心跳消息时在源数据库执行的语句,在示例中,创建了一个debezium_heartbeat心跳表,字段connector
为连接器名称,作为主键,last_heartbeat
为最近一次的心跳时间。
示例代码中用于发送心跳时执行的语句为:
INSERT INTO testdb.debezium_heartbeat (connector, last_heartbeat) VALUES ('app_debezium_connector_v0.2', NOW()) ON DUPLICATE KEY UPDATE last_heartbeat = now();
始终仅插入或更新唯一一条心跳信息。
关于心跳消息的机制,可以参考官方论坛的部分讨论:
https://groups.google.com/g/debezium/c/JFXqYFvFIns/m/SxWF9OXCAQAJ
ChangeEvent有key, value, destination三个属性。
其中key为事件键,通常为事件纪录来源表的主键。value为事件值,表示事件纪录的内容,包含事件来源(source)、数据变更前的纪录(before)、数据变更后的纪录(after)。destination表示当前事件预期要发送的目的地名称,由连接器名称、纪录来源数据库名、纪录来源表名组成,如果变更纪录发送到Kafka,那么destination表示要发送的目标主题。
变更事件纪录的类型为:更新(UPDATE)、新增(CREATE)、删除(DELETE)。其中key、destination在对应事件中都是相同的。
key | destination |
---|---|
{"id":4} | app_debezium_connector_v0.2.testdb.test_cdc |
下面是各数据变更事件纪录value的内容示例:
{
"before": {
"id": 4,
"test_text": "254",
"test_double": "11.09",
"test_time": "19:01:01",
"test_datetime": "2022-12-07 19:01:03",
"test_timestamp": "2022-12-15 15:18:30"
},
"after": {
"id": 4,
"test_text": "255",
"test_double": "11.09",
"test_time": "19:01:01",
"test_datetime": "2022-12-07 19:01:03",
"test_timestamp": "2022-12-15 15:20:02"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "app_debezium_connector_v0.2",
"ts_ms": 1671088802000,
"snapshot": "false",
"db": "testdb",
"sequence": null,
"table": "test_cdc",
"server_id": 1,
"gtid": null,
"file": "binlog.000005",
"pos": 27953,
"row": 0,
"thread": 3450,
"query": null
},
"op": "u",
"ts_ms": 1671088802985,
"transaction": null
}
{
"before": null,
"after": {
"id": 4,
"test_text": "255",
"test_double": "11.09",
"test_time": "19:01:01",
"test_datetime": "2022-12-07 19:01:03",
"test_timestamp": "2022-12-15 15:20:02"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "app_debezium_connector_v0.2",
"ts_ms": 1671088829000,
"snapshot": "false",
"db": "testdb",
"sequence": null,
"table": "test_cdc",
"server_id": 1,
"gtid": null,
"file": "binlog.000005",
"pos": 29354,
"row": 0,
"thread": 3450,
"query": null
},
"op": "c",
"ts_ms": 1671088829475,
"transaction": null
}
{
"before": {
"id": 4,
"test_text": "255",
"test_double": "11.09",
"test_time": "19:01:01",
"test_datetime": "2022-12-07 19:01:03",
"test_timestamp": "2022-12-15 15:20:02"
},
"after": null,
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "app_debezium_connector_v0.2",
"ts_ms": 1671088824000,
"snapshot": "false",
"db": "testdb",
"sequence": null,
"table": "test_cdc",
"server_id": 1,
"gtid": null,
"file": "binlog.000005",
"pos": 28662,
"row": 0,
"thread": 3450,
"query": null
},
"op": "d",
"ts_ms": 1671088824425,
"transaction": null
}
Debezium文档较为分散,以下是常见属性及功能配置的文档链接:
MySQL中的配置及准备工作
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#setting-up-mysql
Debezium引擎模块及API调用示例
https://debezium.io/documentation/reference/1.9/development/engine.html
高级纪录消费接口API使用
https://debezium.io/documentation/reference/1.9/development/engine.html#advanced-consuming
引擎属性
https://debezium.io/documentation/reference/1.9/development/engine.html#engine-properties
必需的连接器配置属性
高级连接器配置属性
配置数据库连接属性
数据库历史属性
https://debezium.io/documentation/reference/1.9/development/engine.html#database-history-properties
消息序列化配置属性
https://debezium.io/documentation/reference/1.9/configuration/avro.html
数据变更事件及事件消息属性
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-events
执行快照的工作流程
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-snapshots
时间类型说明
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-temporal-types
配置和使用转换器
向Debezium连接器发送信号
https://debezium.io/documentation/reference/1.9/configuration/signalling.html
消息过滤
https://debezium.io/documentation/reference/1.9/transformations/filtering.html
墓碑事件(tombstone events)
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-tombstone-events
故障问题处理
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-when-things-go-wrong
常见问题解答