こんにちは。Red Hatでソリューションアーキテクトをしている杉本 拓です。前回書いたRed HatのService Registryの記事ではService Registryの概要と基本的な使い方について解説しましたが、今月の赤帽エンジニアブログの記事では、QuarkusのアプリケーションからService RegistryをKafkaと組み合わせて使用する方法についてご紹介したいと思います。なお、本記事の内容は、Using Quarkus applications with Kafka instances and Red Hat OpenShift Service Registryをベースとしています。
事前準備
Service Registryを利用したアプリケーションをビルドを進める前に、以下の事前準備をしておきましょう。
マネージドサービスでの環境の作成
Red Hat Hybrid Cloud Consoleで以下の準備をしておいてください。
- Kafkaインスタンスの作成
- Service Registryインスタンスの作成
- Service Accountの作成 (KafkaとService Registryで共通)
- KafkaインスタンスとService Registryインスタンスでのアクセス権の設定
なお、これまでの赤帽ブログで上記の実施手順についてご紹介していますので、以下の記事もご参照ください。
Kafkaインスタンスのアクセス権の設定画面からManage access
をクリックし、作成したサービスアカウントを使用してConsumer/ProducerそれぞれのアプリケーションアクセスできるようにトピックとConsumer groupに対して、以下のようにアクセス権を設定しておきます。
なお、Service Registryでのサービスアカウントの設定では、使用するサービスアカウントに対してManager権限を付与しておいてください。
ローカルのアプリケーション開発環境
ローカルマシンでアプリケーションをビルドするための環境としては以下を用意しておいてください。
- OpenJDK 11 以上
- Apache Maven 3.8 以上
- IDE (Visual Studio CodeやEclipseなど)
ソースコードのインポート
事前準備ができたら、まずはRed HatのApplication ServicesのGithubリポジトリからソースコードをインポートします。
git clone https://github.com/redhat-developer/app-services-guides app-services-guides
なお、このApplication ServicesのGithubリポジトリには、Red HatのマネージドKafkaやService Registryを利用するためのガイドやサンプルコードがありますので、ぜひ参考にして頂ければと思います。
ここで使用するサンプルコードはcode-examplesフォルダにあるquarkus-service-registry-quickstartという名前のプロジェクトですので、このフォルダをVisual Studio Codeに取り込むと以下のように、KafkaのProducerとConsumerのアプリケーションフォルダが表示されます。
KafkaとService Registryにアクセスするための設定
では次に、このサンプルのQuarkusアプリケーションからKafkaとService Registryにアクセスするための設定をしていきましょう。Red Hatのマネージドサービスとして提供されているKafkaとService Registryにアクセスするには、事前準備のところで用意したKafkaインスタンスとService Registryインスタンスへの接続情報とService Account (KafkaとService Registryで共通)が必要となりますので、Hybrid Cloud Consoleで値を確認してそれぞれの情報を取得しておきましょう。
- <bootstrap_server> : 使用するKafkaインスタンスのBootstrap serverのホスト名
- <service_registry_url> : 使用するService RegistryインスタンスのCore Registry APIのURLのFQDNまで
- <service_registry_core_path> : Service RegistryのCore Registry APIのURLへのパス (通常 "/apis/registry/v2")
- <oauth_token_endpoint_uri> : OAuth認証で使用されるSASL/OAUTHBEARERトークンのエンドポイントURL
- <client_id>: Service AccountのClient ID
- <client_secret>: Service AccountのClient Secret
Producer/Consumerそれぞれのソースコードのフォルダ(src/main/resources)にあるapplication.propertiesファイルには、既に必要な接続情報が環境変数を使用してアプリケーションに渡せるようになっていますので、ターミナルから環境変数を設定しておきます(以下の接続情報はそれぞれ取得した値に変更してから環境変数を設定してください)。
$ export KAFKA_HOST=<bootstrap_server> $ export SERVICE_REGISTRY_URL=<service_registry_url> $ export SERVICE_REGISTRY_CORE_PATH=/apis/registry/v2 $ export RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL=<oauth_token_endpoint_uri> $ export RHOAS_SERVICE_ACCOUNT_CLIENT_ID=<client_id> $ export RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET=<client_secret>
Kafkaトピックの作成
サンプルアプリケーションで使用するKafkaトピックをOpenShift Streams for Apache Kafkaで作成します。Hybrid Cloud ConsoleのKafkaインスタンスにアクセスし、quotes
という名前のトピックを作成しましょう。Partitions数、Message retention、Replicaの値については、ここではデフォルトのままで構いません。
サンプルのQuarkusアプリケーションの実行
このサンプルアプリケーションでは、以下のような構成になっています。
- Consumer (QuotesResourceクラスで実装) : quoteトピックからメッセージを取得し、その値をServer-Sent Eventsを使用してブラウザ画面に表示
- Producer (QuotesProducerクラスで実装) : 5秒間隔でQuote (見積もり) の値をランダムで生成し、quoteトピックにパブリッシュ
接続情報の設定ができたら、以下のコマンドでConsumerアプリケーションを実行してみましょう(環境変数の設定をお忘れなく)。
$ cd ~/code-examples/quarkus-service-registry-quickstart/consumer $ mvn quarkus:dev
Consumerアプリケーションが正常に起動したら、ターミナルは以下のような画面になっているかと思います。
次にブラウザから http://localhost:8080/quotes.html にアクセスすると Quotes の値を表示するようになっています。Consumerアプリケーション起動時にはProducerアプリケーションが起動していないため、値は何も表示されません。
では次に、Consumerアプリケーションを実行したままターミナルをもう一つ起動し、以下のコマンドでProducerアプリケーションを実行してみましょう(環境変数の設定をお忘れなく)。
$ cd ~/code-examples/quarkus-service-registry-quickstart/consumer $ mvn quarkus:dev
Producerアプリケーションが正常に起動すると、ターミナルには以下のように表示されます。
Consumerアプリケーションに含まれているWebアプリケーションにブラウザでアクセスすると、Producerアプリケーションから5秒間隔でQuotesの値が生成された値が、以下のように表示されるのを確認できるかと思います。
Hybrid Cloud ConsoleでService Registryインスタンスを見てみると、サンプルアプリケーションで利用されていたQuotesのスキーマがアーティファクトとして登録されていることが確認できます。
このサンプルアプリケーションでは、Avroフォーマットのスキーマのシリアライズとデシリアライズにおいて、ApicurioのService Registryの機能が利用されており、ProducerとConsumerのアプリケーションのapplication.propertiesでは、それぞれ io.apicurio.registry.serde.avro.AvroKafkaSerializer
と io.apicurio.registry.serde.avro.AvroKafkaDeserializer
が使用されています。このサンプルアプリケーションが実行されたタイミングでQuotesのAvroスキーマがService Registryに登録され、それを元にスキーマに対してルールを設定することができるようになり、スキーマの進化にも対応してくことができるようになります。