Red HatのManaged KafkaにQuarkusアプリケーションで接続してみよう!

こんにちは。Red Hatでソリューションアーキテクトをしている杉本です。先日の赤帽のブログ記事では、Red Hatが提供するManaged Kafka、Red Hat OpenShift Streams for Apache Kafka (RHOSAK)を試してみる方法として、Kafkaの環境を数分で構築し、ローカルマシンのkafkacatからクラウド上に構築されたKafkaインスタンスにアクセスする手順を書きました。今回はQuarkusベースのKafkaクライアントからRed HatのManaged Kafka (RHOSAK) に接続する方法について書いていきたいと思います。

Quarkusのサンプルアプリケーション

まず、OpenShift Streams for Apache Kafka のガイドやサンプルコードが格納されているGitリポジトリからQuarkusのサンプルアプリケーションをダウンロードしていきます。以下のコマンドでサンプルのソースコードをクローンしてみましょう。

git clone https://github.com/redhat-developer/app-services-guides app-services-guides

ローカルへのコピーが完了したら、app-services-guidesの下にあるcode-examples/quarkus-kafka-quickstartフォルダをIDEで開いてコードを確認してみましょう。ここで使用するquarkus-kafka-quickstartのQuarkusアプリケーションは以下の3つのソースコードから構成されています。

  • PriceConverter.java
  • PriceGenerator.java
  • PriceResource.java

アーキテクチャとしては以下の図のように、Price Generatorコンポーネントがランダムな価格 (price) を生成するメッセージのプロデューサーとなり、価格をKafkaトピック (prices) に送信し、Price Converterコンポーネントが コンシューマーとなってpricesトピックからメッセージを取得します。

f:id:tsugimot:20210916162646p:plain
図1 アーキテクチャ

ブラウザがクライアントとして5秒間隔で更新される価格情報を以下のように表示しますが、仕組みとしては、Price Resourceコンポーネントがブラウザからのリクエストを受け付け、SSE (Server-Sent Events) を使ってPrice Converterコンポーネントから受け取った価格情報をブラウザに返して画面を更新しています。

f:id:tsugimot:20210916162758p:plain
図2 ブラウザ画面1

このアプリケーションの詳細については、Quarkusのサイトのドキュメントをご参照ください。

なお、QuarkusとKafkaのメッセージの送受信には SmallRye Reactive Messaging フレームワークとそれに含まれるKafkaコネクター (smallrye-kafka) が使用されています。このフレームワークは Eclipse MicroProfile Reactive Messaging の仕様を実装したものとなっており、以下のようにCDIのアノテーション (@Outgoing/@Incoming) でメッセージの送受信を行うチャネル名を指定します(チャネルはMicroProfile Reactive Messagingの仕組みの中でメッセージの送受信で使われるもので、必ずしもKafkaトピックとは限りません)。

PriceGenerator.java

public class PriceGenerator {
    ...
    @Outgoing("generated-price")

PriceConverter.java

public class PriceConverter {
    ...
    @Incoming("prices")
    @Outgoing("my-data-stream")

接続先となるKafkaトピックは以下のようにapplication.propertiesの中で指定されています (*.topicプロパティ)。outgoingとincomingの後に記述されているのがアノテーションで指定されているチャネル名です。

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.topic=prices
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
RHOSAKにアクセスする場合の認証方式

KafkaクライアントであるQuarkusアプリケーションからRHOSAKのKafkaインスタンスにアクセスする場合の認証方法として、以下の2つがサポートされています。

  • SASL/PLAIN: クライアントアプリケーションの認証にサービスアカウントのクレデンシャル(Client IDとClient secret)をユーザ名/パスワードとして使用し、Kafkaインスタンスに接続します。

  • SASL/OAUTHBEARER: RHOSAKはKafkaが提供するSASL/OAUTHBEARERを通じてOAuth2を使用した認証・認可をサポートしており、Kafkaクライアントの認証にサービスアカウントのクレデンシャルとトークンエンドポイントURLを使用してKafkaインスタンスに接続します。

RHOSAKでは後者のSASL/OAUTHBEARERを使用する方式を推奨しており、このサンプルアプリケーションでもSASL/OAUTHBEARERを使用するための設定をおこなっていきます。

このKafkaのクライアントアプリケーションがRHOSAKにアクセスできるようにするためには、以下の情報が必要となります。

  • Bootstrap Serverのエンドポイント
  • Kafka用に作成されたサービスアカウントのクレデンシャル (Client IDとClient secret)
  • OAuthのトークンエンドポイント

このサンプルアプリケーションではこれらの設定情報の環境変数がapplication.propertiesの中で設定されています。

Bootstrap ServerのエンドポイントとOAuthのトークンエンドポイントの値については、RHOSAKのWebコンソールで、Streams for Apache Kafka > Kafka Instancesのページを開き、そこに表示されるKafkaインスタンスを選択して接続情報を表示すると確認することができます。ここでは、以前作成したKafkaインスタンスと同じ名前の my-kafka-instance を使用しています。

f:id:tsugimot:20210916163139p:plain
図3 接続情報画面

Kafka用に作成したサービスアカウントのクレデンシャル (Client IDとClient secret) の作成方法は以前の赤帽ブログの記事に作成の仕方が書かれていますので、もしまだ作成されていない場合には以前のブログ記事を参考にして作成してください。

上記の値が取得できたら、以下のコマンドで環境変数として設定しておきましょう。それぞれの値は実際の値に置き換えてください。

$ export BOOTSTRAP_SERVER=<bootstrap_server>
$ export CLIENT_ID=<client_id>
$ export CLIENT_SECRET=<client_secret>
$ export OAUTH_TOKEN_ENDPOINT_URI=<oauth_token_endpoint_uri>

このQuarkusサンプルアプリケーションでは、SASL/OAUTHBEARERを使用してKafkaインスタンスに接続するために、application.propertiesにおいて以下のプロパティが設定済みとなっています (devプロファイルを使用しています)。

%dev.kafka.bootstrap.servers=${BOOTSTRAP_SERVER}
%dev.kafka.security.protocol=SASL_SSL
%dev.kafka.sasl.mechanism=OAUTHBEARER
%dev.kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="${CLIENT_ID}" \
  oauth.client.secret="${CLIENT_SECRET}" \
  oauth.token.endpoint.uri="${OAUTH_TOKEN_ENDPOINT_URI}" ;
%dev.kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

QuarkusアプリケーションでのKafkaに関するプロパティの設定については、こちらのリファレンスガイド

Quarkus - Apache Kafka Reference Guide

に詳細な情報がありますので、アプリケーションを開発する際にはぜひ参考にしてください。

サンプルアプリケーション用のKafkaトピックの作成

このQuarkusのサンプルアプリケーションでは、pricesという名前のKafkaトピックを使用していますので、RHOSAKのKafkaインスタンスにその名前のトピックを作成しておきましょう。

Kafka Instancesのページで、先ほど作成したKafkaインスタンスの名前をクリックします。Topicsタブで、[Create topic]をクリックし、ガイドに従ってトピックに関する情報を設定していきます。ここではトピック名を prices とし、あとはデフォルトの値のまま設定をしていきます。

f:id:tsugimot:20210916163315p:plain
図4 トピック作成画面1

設定が完了すると、以下のようにトピックの一覧に prices トピックが作成されます。

f:id:tsugimot:20210916163333p:plain
図5 トピック作成画面2

サンプルアプリケーションの実行

では、ダウンロードしたQuarkusのサンプルアプリケーションを実行してみましょう。上のステップで環境変数を設定したら、以下のコマンドでQuarkusアプリケーションをデベロッパーモードで起動してください。

$ cd ~/code-examples/quarkus-kafka-quickstart
$ ./mvnw quarkus:dev

正常に起動すると以下のような画面が表示されます。

f:id:tsugimot:20210916163358p:plain
図6 コマンド実行画面

ブラウザで http://localhost:8080/prices.html にアクセスすると、以下の画面のように、価格情報が5秒間隔で更新されるのが確認できると思います。

f:id:tsugimot:20210916163430p:plain
図7 ブラウザ画面2

動作が確認できたらぜひこのサンプルアプリケーションのソースコードをもう一度よく確認してみることをおすすめします。

まとめ

このように、RHOSAKを使用することでKafkaの環境を簡単に構築することができるだけでなく、Kafkaを利用するリモートのクライアントアプリケーションからも簡単に接続させることができるようになります。さらにQuarkusにはKafkaのクライアントとなるリアクティブなアプリケーションを効率的に開発するための機能が用意されており、Quarkusのサイトでは日本語の情報も多く提供されているので、ぜひQuarkusを使った開発にもトライしてみることをおすすめします。

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。