レッドハットでインテグレーションのためのミドルウェア製品のテクニカルサポートを担当している山下です。今回は Jiri Pechanec さんによる「DebeziumとApache Camelのインテグレーションシナリオ 」の翻訳記事です。
今回の記事では、Debeziumによってデータベースからの変更をキャプチャしてKafkaへと流し、CamelによるEIPを通して、Twitterとメールに配信する、というシナリオを説明します。
ところ今回の記事では軽く触れながらも流しているのですが、Kafka Connectランタイム上で Camel のコンポーネントを動作させるプロジェクトが進んでいます。これによってCamelが既に持っている様々なシステムとのアダプタを利用して、容易にKafkaとのデータの出し入れができるようになっていくはずです(もちろんオープンソースで!)。個人的にはこちらも楽しみにしているところ。
[原文] Integration Scenarios with Debezium and Apache Camel by Jiri Pechanec
あんまり翻訳がうまくないのは許してください (^^;
Debeziumの典型的な使用例の1つは、チェンジデータキャプチャ(Change Data Capture)を使用して、レガシーシステムを組織内の他のシステムとインテグレーションすることです。 この目標を達成する方法は複数あります。
- Debeziumを使用してデータをKafkaに書き込み、続いてKafka StreamsパイプラインとKafka Connectコネクターの組み合わせて、他のシステムに変更を配信する
- Javaスタンドアロンアプリケーションで Debezium Embedded engine を使用し、プレーンJavaを使用してインテグレーションコードを記述する;これはたびたび変更イベントをAmazon Kinesis、Google Pub/Subなどの代替メッセージングインフラストラクチャに送信するために使用される
- 既存のインテグレーションフレームワークまたはサービスバスを使用してパイプラインロジックを表現する
この記事では、3番目のオプションである専用のインテグレーションフレームワークに焦点を当てます。
Apache Camel
Camelは、異なるシステムやサービス間でデータの読み取り、変換、ルーティング、書き込みを行えるようにするオープンソースのインテグレーションフレームワークです。
サードパーティシステムへのインターフェイスとなる大量の既製コンポーネントや、EIP : エンタープライズインテグレーションパターン による実装を支援します。
この組み合わせにより、開発者は対象システムに簡単に接続し、宣言的なDSLを使用してインテグレーションパイプラインを表現できます。
Camel と Debezium
Camel 3は2019年末にリリースされ、新しく再設計が行われただけでなく、Debeziumコンポーネントがコードベースに追加されました 。 また、CamelをKafka Connectランタイムでコネクターとして使用できるようにします。 この投稿はDebeziumコンポーネントの使用のみに焦点を当てており、後者のオプションについては今後の投稿で取り上げるつもりです。
- Debezium MySQL Connector Component
- Debezium PostgreSQL Connector Component
- Debezium SQL Server Connector Component
- Debezium MongoDB Connector Component
ご覧のように、すべての非インキュベーション Debeziumコネクタは専用コンポーネントで表されます。 このソリューションの利点は、依存関係を完全に分離し、コネクタインスタンスのタイプセーフな構成としていることです。 内部的にコンポーネントは、Debezium エンベデッドエンジンを含んだCamelイベントドリブンコンシューマを用いて、Debezium エンドポイントを公開します。
Example
例として、StackOverflowのようなシンプルな質問と回答(Q&A)アプリケーションを作成しました。 REST APIを使用すると、新規の質問や、データベースに保存された既存の質問への回答を投稿できます。 アプリケーションによって生成されたデータの変更(たとえば、新規の質問や回答が作成された場合)は、Debeziumを介してキャプチャされ、Camelパイプラインに渡されます。 GitHubでサンプルの完全なソースコードを見つけることができます。
アプリケーションによって生成されたデータの変更(新しい質問または回答が作成された場合など)は、Debeziumによってキャプチャされ、SMTPサーバーを通して電子メールを送信してTwitterアカウントにツイートを投稿するCamelパイプラインに渡されます。完全なソースコードはGitHubにあります。
トポロジ
ソリューショントポロジには複数のコンポーネントがあります:
- Q&AアプリケーションはQuarkusスタックを使用して実装され、REST APIを公開して質問と回答を作成
- アプリケーションはPostgreSQLデータベースにデータを保存
- Camel routeは、組み込みのInfinispanストアを使用してその状態を保持する(質問と回答を集約するオブジェクトの構築に使用)、プレーンなJavaアプリケーションとして実行され、電子メールおよび関連するTwitterアカウントを通して質問に回答
- MailHog 電子メールを送信するためのコンテナで実行されているSMTPサーバー
Q&Aアプリケーション
ソースアプリケーションは、Quarkusに基づくシンプルなRESTサービスです。 PostgreSQLデータベースに保存されている「1:n」リレーションを使用して、「Question」と「Answer」の2つのエンティティを管理します。
エンティティはREST APIを使用して作成され、自動的にそれらの間を関連付けます。
The Camel Pipeline
Camelパイプラインは、以下のビジネスルールを表現します。
- 作成または更新された質問ごとに、質問の作成者にメールを送信します
- 作成または更新されたすべての回答について、質問と回答の作成者の両方にメールを送信します
- 質問の回答が3つに達したら、それに関する専用のTwitterアカウントにツイートを投稿します
このビジネス要件は、このEIP図で記述されたパイプラインに置き換えることができます。
コードの説明
Debezium Camelコンポーネントを使用するには、以下の依存を pom.xml
ファイルに追加する必要があります。
<dependencyManagement> <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-bom</artifactId> <version>${version.camel}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- Use required Debezium version --> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-postgres</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-core</artifactId> <version>${version.debezium}</version> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-debezium-postgres</artifactId> </dependency> </dependencies>
The pipeline logic itself is defined in QaDatabaseUserNotifier class. Its main route looks like tis:
パイプラインロジック自体は QaDatabaseUserNotifierで定義されています。 Camel ルートは以下のようになります。
public class QaDatabaseUserNotifier extends RouteBuilder { @Override public void configure() throws Exception { from("debezium-postgres:localhost?" + "databaseHostname={{database.hostname}}" + "&databasePort={{database.port}}" + "&databaseUser={{database.user}}" + "&databasePassword={{database.password}}" + "&databaseDbname=postgres" + "&databaseServerName=qa" + "&schemaWhitelist={{database.schema}}" + "&tableWhitelist={{database.schema}}.question,{{database.schema}}.answer" + "&offsetStorage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore") .routeId(QaDatabaseUserNotifier.class.getName() + ".DatabaseReader") // ** #1 ** .log(LoggingLevel.DEBUG, "Incoming message ${body} with headers ${headers}") .choice() // ** #2 ** .when(isQuestionEvent) .filter(isCreateOrUpdateEvent) // ** #3 ** .convertBodyTo(Question.class) // ** #4 ** .log(LoggingLevel.TRACE, "Converted to logical class ${body}") .bean(store, "readFromStoreAndUpdateIfNeeded") // ** #5 ** .to(ROUTE_MAIL_QUESTION_CREATE) // ** #6 ** .endChoice() .when(isAnswerEvent) .filter(isCreateOrUpdateEvent) .convertBodyTo(Answer.class) .log(LoggingLevel.TRACE, "Converted to logical class ${body}") .bean(store, "readFromStoreAndAddAnswer") .to(ROUTE_MAIL_ANSWER_CHANGE) .filter(hasManyAnswers) // ** #7 ** .setBody().simple("Question '${exchangeProperty[aggregate].text}' has " + "many answers (generated at " + Instant.now() + ")") .to(TWITTER_SERVER) .end() .endChoice() .otherwise() .log(LoggingLevel.WARN, "Unknown type ${headers[" + DebeziumConstants.HEADER_IDENTIFIER + "]}") .endParent(); from(ROUTE_MAIL_QUESTION_CREATE) // ** #6 ** .routeId(QaDatabaseUserNotifier.class.getName() + ".QuestionNotifier") .setHeader("To").simple("${body.email}") .setHeader("Subject").simple("Question created/edited") .setBody().simple("Question '${body.text}' was created or edited") .to(SMTP_SERVER); } @Converter public static class Converters { @Converter public static Question questionFromStruct(Struct struct) { // ** #4 ** return new Question(struct.getInt64("id"), struct.getString("text"), struct.getString("email")); } @Converter public static Answer answerFromStruct(Struct struct) { // ** #4 ** return new Answer(struct.getInt64("id"), struct.getString("text"), struct.getString("email"), struct.getInt64("question_id")); } } }
説明は以下のとおりです。
#1 | from はDebeziumソースエンドポイントです。 URIパーツは、コネクター構成オプションに直接マップします |
#2 | パイプラインロジックは、変更イベントタイプに応じてスプリットされます。これはソーステーブルの識別子( <server_name>。<schema_name>。<table_name> )を含んだ CamelDebeziumIdentifier のヘッダーに基づきます |
#3 | パイプラインは、更新と削除のみを処理できるようになりました。これはメッセージEnvelope のop フィールドを含むCamelDebeziumOperation ヘッダーに基づきます |
#4 | Kafka ConnectのStruct 型は、パイプラインで使用される論理型に変換されます。変換は、カスタムのCamel コンバーターによって実行されます。手軽に利用できるDebeziumTypeConverter を使用してStruct をMap に変換することもできますが、しかしこれはパイプラインロジックがテーブル構造に強く結合してしまいます。 |
#5 | メッセージストア と通信するルートが呼び出されます。Infinispanキャッシュに基づいてメッセージの集約を作成します。メッセージストアは、質問がすでに保存されているかどうかを確認します。新しい集約が作成されてない場合には保存し、そうでない場合には保存されている集約が新しいデータで更新されます。 |
#6 | メールメッセージをフォーマットして、SMTPエンドポイントを介して質問作成者にメール配信するルートが呼び出されます。 |
#7 | 回答メッセージタイプに関連するルート部分は非常に似ています(回答は質問集約に追加されます)。主な違いは、集約に3つの回答が含まれる場合のTwitterメッセージの投稿です。 |
補足ですが、シンプルな例とするために、揮発的なメモリを使用してDebeziumオフセットを格納しています。 永続ストレージの場合にはファイルベースのオフセットストアを使用するか、Infinispanによるカスタムオフセットストア実装を作成してキャッシュにオフセットの保存を任せることもできます。
デモ
デモを実行するには、適切なAPIキーとシークレットを備えたTwitter 開発者アカウントが必要です。 アプリケーションディレクトリに移動して、すべてのコンポーネントをビルドしてください。
$ mvn clean install
サービスを開始します(独自のTwitter API資格情報を提供します):
$ env TWITTER_CONSUMER_KEY=<...> TWITTER_CONSUMER_SECRET=<...> TWITTER_ACCESS_TOKEN=<...> TWITTER_ACCESS_TOKEN_SECRET=<...> docker-compose up
別のターミナルで質問とそれに3つの回答を作成します:
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/ -d @src/test/resources/messages/create-question.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer1.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer2.json
$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer3.json
Twitterアカウントには、『質問「犬にはいくつの足がありますか?』などのテキストを含む新しいツイートが含まれているはずです。また、[MailHogサーバーUI](http:// localhost:8025 /)には次のようなメッセージが表示されます。
まとめ
Apache Camelは、システムインテグレーションシナリオを実装するための非常に興味深いオプションです。外部のメッセージングインフラストラクチャを必要とせずに、Debeziumコンポーネントを使用してスタンドアロンのCamelルートを展開するのは非常に簡単で、データの変更をキャプチャし、複雑なルーティングおよび変換操作を実行できます。 Camelは、複雑なサービスオーケストレーションに含まれる可能性のあるさまざまなシステム用の100を超えるコネクタだけでなく、EIP : エンタープライズインテグレーションパターンの完全な実装を開発者に提供します。
今回の例の完全なソースコードはGitHubから入手できます。
Debeziumについて
Debeziumは、既存のデータベースをイベントストリームに変換するオープンソースの分散プラットフォームです。アプリケーションは、データベース内にコミットされた行レベルの変更をほとんど瞬時に確認して反応することができます。 DebeziumはKafkaの上に構築され、特定のデータベースを監視するKafka Connect互換コネクタを提供します。Debeziumはデータ変更の履歴をKafkaログに記録するため、アプリケーションはいつでも停止や再起動することができ、実行中に見逃したすべてのイベントを簡単に消費できるため、すべてのイベントを正しく完全に処理することができます。 DebeziumはApache License、Version 2.0でありオープンソースです。
参加しよう
Debeziumに興味を持ち試してみてほしいと思います。 Twitter @debezium、Gitterでチャット、またはメーリングリスト でコミュニティと話すことができます。 すべてのコードはGitHubでオープンソースです。したがって、コードをローカルでビルドし、既存のコネクタを改善し、さらにコネクタを追加できます。 問題を発見したり、Debeziumの改善方法についてアイデアをお持ちの場合は、お知らせいただくか問題を記録してください。
Jiri Pechanec
Jiriは、Red Hatのソフトウェア開発者(そして元QAエンジニア)です。 Javaおよびシステムインテグレーションプロジェクトとそのタスクで多くのキャリアを積んできました。チェコ共和国のブルノの近くに住んでいます。 [github] [linkedin]