DBへの変更をイベントストリームに変換するOSS: Debeziumの紹介とチュートリアル

Red Hatの須江です。

本記事は赤帽エンジニアAdvent Calendar 2018の6日目です。 (子供の寝かしつけで一緒に寝落ちして日付が変わってしまいましたが12/6分です。。。)

Debeziumとは?

DebeziumはDBに対するデータ操作をキャプチャしてイベントストリームに変換してくれる分散プラットフォームです。Apache Kafkaをベースに実装されており、Kafka Connectベースのコネクタを利用してDBをモニターすることができます。 既存DBに対するデータ操作をKafkaのメッセージに変換することができますので、レガシーシステムなどでアプリケーションに手を入れたくないけどデータだけ抜き出してリアルタイム処理したいとか、別システムにデータを流用したいという場合に便利なソフトウェアです。 また、イベントログはKafkaが保持してくれているので、データを利用する側(consumer)が止まってもイベントログが失われることはなく、consumerを復旧させれば処理を再開することが可能です。

詳細情報

https://debezium.io/

github.com

Debeziumチュートリアル

Debeziumを使うには Zookeeper / Kafka / Debezium Connectorの3つのコンポーネントを動かす必要があります(ZookeeperはKafkaを動かすために必要)。 プロダクション環境であれば全コンポーネントを複数プロセスで構成して分散処理を行うようにしますが、動作を理解するためにまずはすべてのコンポーネントをシングルプロセスで動かしてみましょう。 詳しい手順と解説は公式サイトのチュートリアルを参照してみてください。

Tutorial for Debezium 0.8

環境構成

すべてのプロセスをDockerコンテナとして起動します。 最終的には次のような感じになります。

f:id:nobusue:20181207043505p:plain

Dockerは適当に用意してください。自分の環境は以下の通りでした。

$ docker version
Client: Docker Engine - Community
 Version:           18.09.0
 API version:       1.39
 Go version:        go1.10.4
 Git commit:        4d60db4
 Built:             Wed Nov  7 00:47:43 2018
 OS/Arch:           darwin/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          18.09.0
  API version:      1.39 (minimum version 1.12)
  Go version:       go1.10.4
  Git commit:       4d60db4
  Built:            Wed Nov  7 00:55:00 2018
  OS/Arch:          linux/amd64
  Experimental:     true

セットアップ

必要なコンテナを順次起動してからConnectorの設定を行います。 ログを確認しやすくするため、コンテナ毎にターミナルウィンドウを開き、フォアグラウンドで動かします。

(1)Zookeeper起動
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.8
(2)Kafka起動
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.8
(3)MySQL起動

変更を監視する対象としてMySQLを起動しておきます。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8

(4)MySQL CLIを起動

MySQLを操作するためにCLIを起動しておきます。

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

初期状態では以下のようになっています。

mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
5 rows in set (0.00 sec)

mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
(5)Kafka Connect起動

MySQLを監視するConnectorを起動します。8083番ポートを開けているのはREST APIで操作を行うためです。

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.8

以下を実行して、Kafka ConnectのREST APIが動作するか確認しておきます。

$ curl -H "Accept:application/json" localhost:8083/
{"version":"1.1.0","commit":"fdcf75ea326b8e07","kafka_cluster_id":"UPPe8OIjQfCL3e9W51MGZw"}

$ curl -H "Accept:application/json" localhost:8083/connectors/
[]
MySQLのモニタリング設定

ConnectorのREST APIからDBやKafkaに接続するために必要な情報を設定します。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

以下のようなレスポンスが返ってくれば無事設定できています。(JSONは整形しています。以下同様)

HTTP/1.1 201 Created
Date: Thu, 06 Dec 2018 10:41:15 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 483
Server: Jetty(9.2.24.v20180105)

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory",
    "name": "inventory-connector"
  },
  "tasks": [],
  "type": null
}

念の為、正しく設定されているか確認してみます。

$ curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]

$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

HTTP/1.1 200 OK
Date: Thu, 06 Dec 2018 10:43:31 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.2.24.v20180105)

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.user": "debezium",
    "database.server.id": "184054",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.password": "dbz",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory",
    "name": "inventory-connector",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.port": "3306"
  },
  "tasks": [
    {
      "connector": "inventory-connector",
      "task": 0
    }
  ],
  "type": "source"
}

OKですね!

このとき、connectorコンテナ側には「登録できたよ!」というログが出ているはずなので、興味のある方は確認してみてください。

Debeziumの動作を確認

セットアップ大変でしたね。。。この後は簡単です。

(6)Watcher起動

Kafkaにメッセージが投入されたことを確認するためにwatcher(console consumer)を起動しておきます。 Debeziumのデフォルトでは DB名.スキーマ名.テーブル名 というtopicにメッセージが投入されるようになっていますので、ここでは customers テーブルに対する変更を監視するようにしています。

docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.8 watch-topic -a -k dbserver1.inventory.customers

watcherを起動すると、既にいくつかメッセージが投入されていることが確認できます。

WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Contents of topic dbserver1.inventory.customers:
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.8.3.Final","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1544092878919}}

MySQL Connectorはbinlogを解析してイベントに変換するように実装されていますが、初期接続時にはテーブル作成時からのすべての変更を読み取るため、このような動作をします。 Connectorを再起動するなどして再接続した場合、前回読み取り済みのログの続きから読み取るようになっています。

興味のある方はSnapshotReaderのソースを読み解いてみて下さい。 https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java

データを更新してみる

UPDATEを実行してデータを更新してみます。

mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne Marie | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

すると、watcher側で以下のイベントログが観測できるはずです。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Key"
  },
  "payload": {
    "id": 1004
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope"
  },
  "payload": {
    "before": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": {
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "0.8.3.Final",
      "name": "dbserver1",
      "server_id": 223344,
      "ts_sec": 1544093538,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 364,
      "row": 0,
      "snapshot": false,
      "thread": 2,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "u",
    "ts_ms": 1544093538316
  }
}

payloadのbeforeとafterを見ると、確かにUPDATE前と後の値が確認できます。 今回は単にコンソール表示しただけですが、データ変更の全容がここに含まれていますので、この情報を別アプリケーションで再利用することが可能です。

Connectorを止めてみる

Connectorを再起動しても、その間の変更がロストしないことを確認してみます。

一旦Connectorを停止します。

docker stop connect

Connectorが停止した状態でデータをINSERTします。

mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
Query OK, 1 row affected (0.01 sec)

mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");
Query OK, 1 row affected (0.01 sec)

Watcherにはログが出ていないはずです。 (Kafkaにメッセージが届いていないので当然ですが。。)

では、Connectorをおもむろに起動します。

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.8

すると、変更監視が再開され、Watcherに以下ログが出力されます。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Key"
  },
  "payload": {
    "id": 1005
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1005,
      "first_name": "Sarah",
      "last_name": "Thompson",
      "email": "kitt@acme.com"
    },
    "source": {
      "version": "0.8.3.Final",
      "name": "dbserver1",
      "server_id": 223344,
      "ts_sec": 1544093884,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 725,
      "row": 0,
      "snapshot": false,
      "thread": 2,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "c",
    "ts_ms": 1544093957590
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Key"
  },
  "payload": {
    "id": 1006
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1006,
      "first_name": "Kenneth",
      "last_name": "Anderson",
      "email": "kander@acme.com"
    },
    "source": {
      "version": "0.8.3.Final",
      "name": "dbserver1",
      "server_id": 223344,
      "ts_sec": 1544093890,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 1035,
      "row": 0,
      "snapshot": false,
      "thread": 2,
      "db": "inventory",
      "table": "customers",
      "query": null
    },
    "op": "c",
    "ts_ms": 1544093957599
  }
}

ちゃんとINSERT2件が捕捉されていますね!

他にもいろいろなパターンで試してみて下さい。

後始末

以下コマンドで全コンテナを停止します。(関係ないコンテナが巻き込まれないように注意してください。)

docker stop $(docker ps -aq)

チュートリアルのショートカット

ちまちまコマンドを打つのが面倒な方向けに、Docker Composeで一発起動できるサンプルが提供されていますのでご利用下さい。

https://github.com/debezium/debezium-examples/tree/master/tutorial

まとめ

DebeziumとKafkaの組み合わせによって、アプリケーションにほぼ影響なしで、DBに対する更新を横取りして再利用することができることを確認しました。 Kafkaという汎用的なメッセージバスに乗せることで、単純なデータレプリケーションだけでなく、同じメッセージストリームを複数の目的で再利用することができます。 レガシーマイグレーションの手段の一つとして、いろいろ使い所がありそうですね。

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。