基于Zookeeper的Debezium Engine高可用实现



使用Debezium Engine运行MySQL连接器中介绍了如何使用Debezium Engine运行MySQL连接器。

以此为基础,本文主要介绍如何通过Zookeeper实现Debezium Engine高可用以及使用Zookeeper存储数据库模式历史和偏移量数据。

通过CuratorFrameWork选主实现高可用

在CuratorFrameWork中,选取主节点有LeaderSelector、LeaderLatch两种方式。LeaderSelector更适合长期运行leader的场景,因此通过LeaderLatch方式选主显然更加适合。

在Debezium属性中配置了database.server.id,表示当前MySQL连接器的数据库实例数字ID。连接器作为一个从服务器,在MySQL集群所有正在运行的数据库实例中必须是唯一的,通常在使用同一份代码情况下,提供的server-id也是同一个,因此不能在同一时刻运行两个或者更多的MySQL连接器。

我们的方案就是只有主节点才可以运行Debezium引擎,而其它客户端节点处于备用待命状态。在LeaderLatchListener#isLeader()方法中启动引擎,开始捕获数据库变更事件。调用了isLeader()方法并不意味着当前客户端节点始终作为主节点一直运行,如果执行了notLeader()回调方法,将会对引擎实例进行关闭,始终遵循主节点运行引擎的原则。

如果作为主节点的引擎服务与服务端断开连接(比如宕机),那么其它引擎服务就会开始争抢领导权,获得领导权的节点就会开始启动自己的引擎,可以继续处理数据库变更事件(包括从上一个引擎服务宕机后到新的引擎服务启动开始过程中发生的变更事件),从而使服务整体不可用时间最短。(服务切换时间主要取决于客户端会话超时时间CuratorFramework#sessionTimeoutMs)

以下是代码实现:

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .using((boolean success, String message, Throwable error) -> {
            if (success) {
                log.info("[CompletionCallback] \nsuccess => {} \nmessage => {}", true, message);
                return;
            }
            log.error("[CompletionCallback] \nsuccess => {}", false, error);
        })
        .notifying(new ChangeEventConsumer())
        .build();

CuratorFramework curatorClient = CuratorFrameworkFactory.builder()
        .connectString("localhost:2181,localhost:2182,localhost:2183")
        .sessionTimeoutMs(5000)
        .retryPolicy(new ExponentialBackoffRetry(1000, 3))
        .connectionTimeoutMs(4000)
        .build();
curatorClient.start();

// 等待zookeeper连接成功
curatorClient.blockUntilConnected();

// 客户端连接状态监听器
curatorClient.getConnectionStateListenable().addListener((client, connectionState) -> {
    log.info("当前客户端连接状态:{}", connectionState.name());
    log.info("当前客户端是否连接:{}", connectionState.isConnected());
});

ExecutorService executor = Executors.newSingleThreadExecutor();
LeaderLatchListener leaderLatchListener = new LeaderLatchListener() {
    @Override
    public void isLeader() {
        log.info("当前客户端节点成为主节点");
        executor.execute(engine);
    }

    @Override
    public void notLeader() {
        log.info("当前客户端节点不再是主节点");
        try {
            engine.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }
};
LeaderLatch leaderLatch = new LeaderLatch(curatorClient, "/leader");
leaderLatch.addListener(leaderLatchListener);
executor.execute(() -> {
    try {
        leaderLatch.start();
    } catch (Exception e) {
        log.error("", e);
    }
});

Zookeeper存储数据模式历史和偏移量数据

参照FileDatabaseHistory、FileOffsetBackingStore,实现了在Zookeeper中存储数据库模式历史和偏移量数据。其中存储数据库模式历史时,使用的是持久化顺序节点,每次接收到数据库模式变更,便会依次创建一个顺序子节点。在引擎启动过程加载数据库模式历史时,首先读取指定父节点的所有子节点名称,并按照子节点名称序号进行排序,然后逐个读取子节点下存储的数据。存储偏移量数据实现较为简单,只需在指定节点下存储每次接收到的偏移量数据即可。

以下是代码实现:

连接器配置需要添加以下属性:

//使用zookeeper存储偏移量
props.setProperty("offset.storage", ZookeeperOffsetBackingStore.class.getCanonicalName());
props.setProperty(ZookeeperOffsetBackingStore.PROP_CONNECT_STRING.name(), "localhost:2181,localhost:2182,localhost:2183");
props.setProperty(ZookeeperOffsetBackingStore.PROP_SESSION_TIMEOUT_MS.name(), "5000");
props.setProperty(ZookeeperOffsetBackingStore.PROP_CONNECTION_TIMEOUT_MS.name(), "4000");
props.setProperty(ZookeeperOffsetBackingStore.PROP_PATH.name(), "/debezium-offset-storage");

//使用zookeeper存储数据库模式历史
props.setProperty("database.history", ZookeeperDatabaseHistory.class.getCanonicalName());
props.setProperty(ZookeeperDatabaseHistory.PROP_CONNECT_STRING.name(), "localhost:2181,localhost:2182,localhost:2183");
props.setProperty(ZookeeperDatabaseHistory.PROP_SESSION_TIMEOUT_MS.name(), "5000");
props.setProperty(ZookeeperDatabaseHistory.PROP_CONNECTION_TIMEOUT_MS.name(), "4000");
props.setProperty(ZookeeperDatabaseHistory.PROP_PATH.name(), "/debezium-database-history");

ZookeeperDatabaseHistory.java

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.util.FunctionalReadWriteLock;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
 * A {@link DatabaseHistory} implementation that stores the schema history in zookeeper.
 */
@ThreadSafe
public final class ZookeeperDatabaseHistory extends AbstractDatabaseHistory {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperDatabaseHistory.class);

    private static final String CONFIGURATION_FIELD_PREFIX_STRING = DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "zookeeper.";

    public static final Field PROP_CONNECT_STRING = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connect.string")
            .withDescription("list of servers to connect to");

    public static final Field PROP_SESSION_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "session.timeout.ms")
            .withDescription("session timeout");

    public static final Field PROP_CONNECTION_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
            .withDescription("connection timeout");

    public static final Field PROP_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "path")
            .withDescription("node path");

    private String connectString;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    private String path;
    private final RetryPolicy defaultRetryPolicy = new ExponentialBackoffRetry(1000, 3);
    private final String defaultChildPathPrefix = "/child";

    private CuratorFramework client;

    private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
    private final AtomicBoolean running = new AtomicBoolean();
    private final DocumentWriter writer = DocumentWriter.defaultWriter();
    private final DocumentReader reader = DocumentReader.defaultReader();

    void connect() {
        client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .retryPolicy(defaultRetryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .build();
        client.start();

        // wait for client connect
        try {
            client.blockUntilConnected();
        } catch (InterruptedException e) {
            LOGGER.error("", e);
        }
        LOGGER.info("curator state:{}", client.getState().name());

        client.getConnectionStateListenable().addListener((client, state) -> {
            LOGGER.info("stateName:{}", state.name());
            LOGGER.info("isConnected:{}", state.isConnected());
        });
    }

    @Override
    public void configure(Configuration configuration, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
        connectString = configuration.getString(PROP_CONNECT_STRING);
        sessionTimeoutMs = configuration.getInteger(PROP_SESSION_TIMEOUT_MS);
        connectionTimeoutMs = configuration.getInteger(PROP_CONNECTION_TIMEOUT_MS);
        path = configuration.getString(PROP_PATH);

        super.configure(configuration, comparator, listener, useCatalogBeforeSchema);
    }

    @Override
    public void start() {
        super.start();
        LOGGER.info("Starting ZookeeperDatabaseHistory");
        connect();

        lock.write(() -> {
            if (running.compareAndSet(false, true)) {
                if (null == path) {
                    throw new IllegalStateException("ZookeeperDatabaseHistory must be configured before it is started");
                }

                if (!storageExists()) {
                    try {
                        client.create()
                                .creatingParentsIfNeeded()
                                .withMode(CreateMode.PERSISTENT)
                                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                                .forPath(path);
                    } catch (Exception e) {
                        throw new DatabaseHistoryException("Unable to create DatabaseHistory node for path:" + path, e);
                    }
                }
            }
        });
    }

    @Override
    public void stop() {
        if (null != client) {
            client.close();
        }
        running.set(false);
        super.stop();
    }

    @Override
    protected void storeRecord(HistoryRecord record) {
        if (null == record) {
            return;
        }

        lock.write(() -> {
            if (!running.get()) {
                throw new IllegalStateException("The history has been stopped and will not accept more records");
            }

            try {
                String childPath = client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(path + defaultChildPathPrefix);

                byte[] bytes = writer.writeAsBytes(record.document());
                client.setData().compressed().withVersion(-1).forPath(childPath, bytes);
            } catch (Exception e) {
                throw new DatabaseHistoryException("Unable to write database history record", e);
            }
        });
    }

    @Override
    protected void recoverRecords(Consumer<HistoryRecord> records) {
        lock.write(() -> {
            try {
                List<String> childPathList = client.getChildren().forPath(path);
                if (null != childPathList) {
                    Collections.sort(childPathList);

                    for (String childPath : childPathList) {
                        childPath = path + "/" + childPath;
                        byte[] bytes = client.getData().decompressed().forPath(childPath);
                        if (null != bytes) {
                            logger.info("[recoverRecords] Read records from childPath:{} dataLength:{}", childPath, bytes.length);
                            if (bytes.length > 0) {
                                records.accept(new HistoryRecord(reader.read(bytes)));
                            }
                        }
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Failed to add recover records from history at path:" + path, e);
            }
        });
    }

    public boolean storageExists() {
        try {
            return null != client && null != client.checkExists().forPath(path);
        } catch (Exception e) {
            LOGGER.error("", e);
        }

        return false;
    }

    public boolean exists() {
        return true;
    }
}

ZookeeperOffsetBackingStore.java

import io.debezium.config.Field;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

/**
 * Implementation of OffsetBackingStore that saves data to zookeeper. To ensure this behaves
 * similarly to a real backing store, operations are executed asynchronously on a background thread.
 */
public class ZookeeperOffsetBackingStore extends MemoryOffsetBackingStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperOffsetBackingStore.class);

    private static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.zookeeper.";

    public static final Field PROP_CONNECT_STRING = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connect.string")
            .withDescription("list of servers to connect to");

    public static final Field PROP_SESSION_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "session.timeout.ms")
            .withDescription("session timeout");

    public static final Field PROP_CONNECTION_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
            .withDescription("connection timeout");

    public static final Field PROP_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "path")
            .withDescription("node path");

    private String connectString;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    private String path;
    private final RetryPolicy defaultRetryPolicy = new ExponentialBackoffRetry(1000, 3);

    private CuratorFramework client;

    void connect() {
        client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .retryPolicy(defaultRetryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .build();
        client.start();

        // wait for client connect
        try {
            client.blockUntilConnected();
        } catch (InterruptedException e) {
            LOGGER.error("", e);
        }
        LOGGER.info("curator state:{}", client.getState().name());

        client.getConnectionStateListenable().addListener((client, state) -> {
            LOGGER.info("stateName:{}", state.name());
            LOGGER.info("isConnected:{}", state.isConnected());
        });
    }

    public boolean storageExists() {
        try {
            return null != client && null != client.checkExists().forPath(path);
        } catch (Exception e) {
            LOGGER.error("", e);
        }

        return false;
    }

    @Override
    public void configure(WorkerConfig workerConfig) {
        super.configure(workerConfig);
        Map<String, String> config = workerConfig.originalsStrings();

        connectString = config.get(PROP_CONNECT_STRING.name());
        sessionTimeoutMs = Integer.parseInt(config.get(PROP_SESSION_TIMEOUT_MS.name()));
        connectionTimeoutMs = Integer.parseInt(config.get(PROP_CONNECTION_TIMEOUT_MS.name()));
        path = config.get(PROP_PATH.name());
    }

    @Override
    public synchronized void start() {
        super.start();
        LOGGER.info("Starting ZookeeperOffsetBackingStore with path {}", path);
        connect();

        if (!storageExists()) {
            try {
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(path, new byte[]{});
            } catch (Exception e) {
                throw new RuntimeException("Unable to create OffsetBacking node for path:" + path, e);
            }
        }

        load();
    }

    @Override
    public synchronized void stop() {
        super.stop();
        if (null != client) {
            client.close();
        }

        LOGGER.info("Stopped ZookeeperOffsetBackingStore");
    }

    private void load() {
        try {
            Stat stat = client.checkExists().forPath(path);
            if (null == stat) {
                throw new RuntimeException("OffsetBacking node for path:" + path + " could not be found, maybe not created");
            }
            if (stat.getDataLength() == 0) {
                return;
            }

            byte[] bytes = client.getData().decompressed().forPath(path);
            if (null == bytes || bytes.length == 0) {
                return;
            }
            Map<byte[], byte[]> raw = SerializationUtils.deserialize(bytes);
            data = new HashMap<>();

            for (Map.Entry<byte[], byte[]> entry : raw.entrySet()) {
                ByteBuffer key = entry.getKey() != null ? ByteBuffer.wrap(entry.getKey()) : null;
                ByteBuffer value = entry.getValue() != null ? ByteBuffer.wrap(entry.getValue()) : null;
                data.put(key, value);
            }
        } catch (Exception e) {
            LOGGER.error("", e);
        }
    }

    @Override
    protected void save() {
        executor.execute(() -> {
            HashMap<byte[], byte[]> raw = new HashMap<>();

            for (Map.Entry<ByteBuffer, ByteBuffer> entry : data.entrySet()) {
                byte[] key = entry.getKey() != null ? entry.getKey().array() : null;
                byte[] value = entry.getValue() != null ? entry.getValue().array() : null;
                raw.put(key, value);
            }

            byte[] bytes = SerializationUtils.serialize(raw);
            try {
                client.setData().compressed().withVersion(-1).forPath(path, bytes);
            } catch (Exception e) {
                LOGGER.error("", e);
            }
        });
    }
}