Red Hatの須江です。
本記事は赤帽エンジニアAdvent Calendar 2018の6日目です。 (子供の寝かしつけで一緒に寝落ちして日付が変わってしまいましたが12/6分です。。。)
Debeziumとは?
DebeziumはDBに対するデータ操作をキャプチャしてイベントストリームに変換してくれる分散プラットフォームです。Apache Kafkaをベースに実装されており、Kafka Connectベースのコネクタを利用してDBをモニターすることができます。 既存DBに対するデータ操作をKafkaのメッセージに変換することができますので、レガシーシステムなどでアプリケーションに手を入れたくないけどデータだけ抜き出してリアルタイム処理したいとか、別システムにデータを流用したいという場合に便利なソフトウェアです。 また、イベントログはKafkaが保持してくれているので、データを利用する側(consumer)が止まってもイベントログが失われることはなく、consumerを復旧させれば処理を再開することが可能です。
詳細情報
Debeziumチュートリアル
Debeziumを使うには Zookeeper / Kafka / Debezium Connectorの3つのコンポーネントを動かす必要があります(ZookeeperはKafkaを動かすために必要)。 プロダクション環境であれば全コンポーネントを複数プロセスで構成して分散処理を行うようにしますが、動作を理解するためにまずはすべてのコンポーネントをシングルプロセスで動かしてみましょう。 詳しい手順と解説は公式サイトのチュートリアルを参照してみてください。
環境構成
すべてのプロセスをDockerコンテナとして起動します。
最終的には次のような感じになります。

Dockerは適当に用意してください。自分の環境は以下の通りでした。
$ docker version Client: Docker Engine - Community Version: 18.09.0 API version: 1.39 Go version: go1.10.4 Git commit: 4d60db4 Built: Wed Nov 7 00:47:43 2018 OS/Arch: darwin/amd64 Experimental: false Server: Docker Engine - Community Engine: Version: 18.09.0 API version: 1.39 (minimum version 1.12) Go version: go1.10.4 Git commit: 4d60db4 Built: Wed Nov 7 00:55:00 2018 OS/Arch: linux/amd64 Experimental: true
セットアップ
必要なコンテナを順次起動してからConnectorの設定を行います。 ログを確認しやすくするため、コンテナ毎にターミナルウィンドウを開き、フォアグラウンドで動かします。
(1)Zookeeper起動
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.8
(2)Kafka起動
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.8
(3)MySQL起動
変更を監視する対象としてMySQLを起動しておきます。
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8
(4)MySQL CLIを起動
MySQLを操作するためにCLIを起動しておきます。
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
初期状態では以下のようになっています。
mysql> show tables; +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | orders | | products | | products_on_hand | +---------------------+ 5 rows in set (0.00 sec) mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
(5)Kafka Connect起動
MySQLを監視するConnectorを起動します。8083番ポートを開けているのはREST APIで操作を行うためです。
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.8
以下を実行して、Kafka ConnectのREST APIが動作するか確認しておきます。
$ curl -H "Accept:application/json" localhost:8083/
{"version":"1.1.0","commit":"fdcf75ea326b8e07","kafka_cluster_id":"UPPe8OIjQfCL3e9W51MGZw"}
$ curl -H "Accept:application/json" localhost:8083/connectors/
[]
MySQLのモニタリング設定
ConnectorのREST APIからDBやKafkaに接続するために必要な情報を設定します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
以下のようなレスポンスが返ってくれば無事設定できています。(JSONは整形しています。以下同様)
HTTP/1.1 201 Created
Date: Thu, 06 Dec 2018 10:41:15 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 483
Server: Jetty(9.2.24.v20180105)
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"name": "inventory-connector"
},
"tasks": [],
"type": null
}
念の為、正しく設定されているか確認してみます。
$ curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
HTTP/1.1 200 OK
Date: Thu, 06 Dec 2018 10:43:31 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.2.24.v20180105)
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "debezium",
"database.server.id": "184054",
"tasks.max": "1",
"database.hostname": "mysql",
"database.password": "dbz",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"name": "inventory-connector",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.port": "3306"
},
"tasks": [
{
"connector": "inventory-connector",
"task": 0
}
],
"type": "source"
}
OKですね!
このとき、connectorコンテナ側には「登録できたよ!」というログが出ているはずなので、興味のある方は確認してみてください。
Debeziumの動作を確認
セットアップ大変でしたね。。。この後は簡単です。
(6)Watcher起動
Kafkaにメッセージが投入されたことを確認するためにwatcher(console consumer)を起動しておきます。
Debeziumのデフォルトでは DB名.スキーマ名.テーブル名 というtopicにメッセージが投入されるようになっていますので、ここでは customers テーブルに対する変更を監視するようにしています。
docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.8 watch-topic -a -k dbserver1.inventory.customers
watcherを起動すると、既にいくつかメッセージが投入されていることが確認できます。
WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Contents of topic dbserver1.inventory.customers:
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.8.3.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1544092878919}}
MySQL Connectorはbinlogを解析してイベントに変換するように実装されていますが、初期接続時にはテーブル作成時からのすべての変更を読み取るため、このような動作をします。 Connectorを再起動するなどして再接続した場合、前回読み取り済みのログの続きから読み取るようになっています。
興味のある方はSnapshotReaderのソースを読み解いてみて下さい。 https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java
データを更新してみる
UPDATEを実行してデータを更新してみます。
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
すると、watcher側で以下のイベントログが観測できるはずです。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Key"
},
"payload": {
"id": 1004
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "0.8.3.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1544093538,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"snapshot": false,
"thread": 2,
"db": "inventory",
"table": "customers",
"query": null
},
"op": "u",
"ts_ms": 1544093538316
}
}
payloadのbeforeとafterを見ると、確かにUPDATE前と後の値が確認できます。 今回は単にコンソール表示しただけですが、データ変更の全容がここに含まれていますので、この情報を別アプリケーションで再利用することが可能です。
Connectorを止めてみる
Connectorを再起動しても、その間の変更がロストしないことを確認してみます。
一旦Connectorを停止します。
docker stop connect
Connectorが停止した状態でデータをINSERTします。
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com"); Query OK, 1 row affected (0.01 sec) mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com"); Query OK, 1 row affected (0.01 sec)
Watcherにはログが出ていないはずです。 (Kafkaにメッセージが届いていないので当然ですが。。)
では、Connectorをおもむろに起動します。
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.8
すると、変更監視が再開され、Watcherに以下ログが出力されます。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Key"
},
"payload": {
"id": 1005
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1005,
"first_name": "Sarah",
"last_name": "Thompson",
"email": "kitt@acme.com"
},
"source": {
"version": "0.8.3.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1544093884,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 725,
"row": 0,
"snapshot": false,
"thread": 2,
"db": "inventory",
"table": "customers",
"query": null
},
"op": "c",
"ts_ms": 1544093957590
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Key"
},
"payload": {
"id": 1006
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1006,
"first_name": "Kenneth",
"last_name": "Anderson",
"email": "kander@acme.com"
},
"source": {
"version": "0.8.3.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1544093890,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 1035,
"row": 0,
"snapshot": false,
"thread": 2,
"db": "inventory",
"table": "customers",
"query": null
},
"op": "c",
"ts_ms": 1544093957599
}
}
ちゃんとINSERT2件が捕捉されていますね!
他にもいろいろなパターンで試してみて下さい。
後始末
以下コマンドで全コンテナを停止します。(関係ないコンテナが巻き込まれないように注意してください。)
docker stop $(docker ps -aq)
チュートリアルのショートカット
ちまちまコマンドを打つのが面倒な方向けに、Docker Composeで一発起動できるサンプルが提供されていますのでご利用下さい。
https://github.com/debezium/debezium-examples/tree/master/tutorial
まとめ
DebeziumとKafkaの組み合わせによって、アプリケーションにほぼ影響なしで、DBに対する更新を横取りして再利用することができることを確認しました。 Kafkaという汎用的なメッセージバスに乗せることで、単純なデータレプリケーションだけでなく、同じメッセージストリームを複数の目的で再利用することができます。 レガシーマイグレーションの手段の一つとして、いろいろ使い所がありそうですね。