Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。
この記事は、Quarkus.io のブログ記事、How to Use Kafka, Schema Registry and Avro with Quarkus の翻訳記事です。
Kafka の世界では、Apache Avro は圧倒的に最も使用されているシリアライゼーションプロトコルです。Avro はデータシリアライズシステムです。Kafka と組み合わせることで、スキーマベースの堅牢で高速なバイナリシリアライズを実現します。
このブログ記事では、Quarkus アプリケーションでスキーマレジストリを使用して Avro を使用する方法を見ていきます。このブログでは、JVM モードを中心に紹介しています。ネイティブモードについては別の記事で取り上げます。
私たちは簡単なアプリケーションを書いていきます。それは HTTP リクエストを受信して、Kafka にペイロードを書き込んで、Kafka から読み出すというアプリです。簡単に言うと、同じアプリケーションが Kafka に書き込んでそこから読むということになります。ですが、明らかに現実世界では別のアプリケーションになるでしょう。
開始方法
では、最初から始めてみましょう。https://code.quarkus.io にアクセスしてプロジェクトを作成し、以下の拡張機能を選択します。
- RESTEasy JSON-B
- SmallRye Reactive Messaging - Kafka Connector
- Apache Avro
プロジェクトをダウンロードして、お好きな IDE で開いてください。
生成された pom.xml
にいくつかのコンテンツを追加する必要があります。 pom.xml
ファイルを開き、以下のdependency
を追加します。
<dependency> <groupId>io.apicurio</groupId> <artifactId>apicurio-registry-utils-serde</artifactId> <version>1.2.2.Final</version> <exclusions> <exclusion> <groupId>org.jboss.spec.javax.interceptor</groupId> <artifactId>jboss-interceptors-api_1.2_spec</artifactId> </exclusion> </exclusions> </dependency>
この依存関係は、Avro シリアライザとデシリアライザを提供します。このシリアライザ/デシリアライザには複数のバージョンがあります。このブログ記事では、Apicurio から提供されたものを使用しています。Confluent のものを使うこともできます(アーティファクトは Maven Central にはないので、追加のリポジトリを追加する必要があります)。
また、avro-maven-plugin
を追加する必要があります。 <build><plugins>
の下に、以下を追加します。
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.9.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>src/main/avro</sourceDirectory> <outputDirectory>${project.build.directory}/generated-sources</outputDirectory> <stringType>String</stringType> </configuration> </execution> </executions> </plugin>
このプラグインは、src/main/avro
ディレクトリにある Avro スキーマ ファイルからコードを生成します。この設定で、ようやくコードを書き始められます。
Avro スキーマ
まず、Kafka で読み書きするオブジェクトを表すスキーマを書く必要があります。以下の内容の src/main/avro/movie.avsc
ファイルを作成します。
{ "namespace": "me.escoffier.quarkus", "type": "record", "name": "Movie", "fields": [ { "name": "title", "type": "string" }, { "name": "year", "type": "int" } ] }
pom.xml
ファイルで設定された avro-maven-plugin
は、title
と year
の属性を持つ me.escoffier.quarkus.Movie
クラスを生成します。クラスを生成するには、以下を実行します。
mvn generate-sources
Movie リソース
最初に書くクラスは HTTP リクエストを受け取り、(Movie の)データをKafkaに書き込みます。 src/main/java/me/escoffier/MovieResource.java
を以下の内容で作成します。
package me.escoffier; import me.escoffier.quarkus.Movie; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.jboss.logging.Logger; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @Path("/movies") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public class MovieResource { private static final Logger LOGGER = Logger.getLogger("MovieResource"); @Inject @Channel("movies") Emitter<Movie> emitter; @POST public Response enqueueMovie(Movie movie) { LOGGER.infof("Sending movie %s to Kafka", movie.getTitle() ); emitter.send(movie); return Response.accepted().build(); } }
この JAX-RS リソースは簡単です。これは単一のエンドポイントメソッドを持ち、/movies
で JSON ペイロードを受信します。RESTEasy は JSON ドキュメントを自動的に Movie
オブジェクトにマッピングします。 avsc
ファイルで説明されているように、期待される JSON には title
と year
の 2 つのフィールドが含まれています。
リアクティブメッセージングで Quarkus を使用する場合、Kafka と直接やりとりすることはありません。あなたは Emitter
を注入します。これはオブジェクト(私たちのmovie)をチャネルに送ります。アプリケーションの設定では、このチャネルを Kafka トピックにマッピングします。
設定といえば、src/main/resources/application.properties
を開いて、以下を追加します。
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api mp.messaging.outgoing.movies.connector=smallrye-kafka mp.messaging.outgoing.movies.topic=movies mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
この設定は少し説明が必要です。まず、mp.messaging.connector.smallrye-kafka.apicurio.registry.url
でスキーマレジストリの URL を設定します。Apicurio の代わりに Confluent のシリアライザ/デシリアライザを使用した場合、mp.messaging.connector.smallrye-kafka.schema.registry.url
という名前のプロパティです。
mp.messaging.outgoing.movies
は movies
チャンネルを設定します。 connector
属性は、SmallRye Kafka コネクタがチャネルを管理していることを示します。 topic
属性はトピックの名前を指定します(これはチャンネル名と一致するので、この場合は省略できます) 。value.serializer
は使用するシリアライザを設定します。ここでは、Apicurio が提供する io.apicurio.registry.utils.serde.AvroKafkaSerializer
を使用します。 registry.*
プロパティは、レジストリがスキーマをどのように扱うかを設定します。
Movie の使用
後半のアプリはさらにシンプルになっています。受信した映画をログに記録するだけです。
src/main/java/me/escoffier/MovieConsumer.java
を以下の内容で作成します。
package me.escoffier; import me.escoffier.quarkus.Movie; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.jboss.logging.Logger; import javax.enterprise.context.ApplicationScoped; @ApplicationScoped public class MovieConsumer { private static final Logger LOGGER = Logger.getLogger("MovieConsumer"); @Incoming("movies-from-kafka") public void receive(Movie movie) { LOGGER.infof("Received movie: %s (%d)", movie.getTitle(), movie.getYear()); } }
@Incoming
アノテーションは、movies-from-kafka
チャネル上を遷移する各 Movie
オブジェクトに対してメソッドが呼び出されることを示しています。今回はケースでは、単純にログメッセージを書きます。
もうすぐそこまで来ています。Kafka からの受信を設定する必要があります。 application.properties
を再度開き、以下を追加します。
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka mp.messaging.incoming.movies-from-kafka.topic=movies mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
これらのプロパティは movies-from-kafka
を movies
Kafka トピックに対応付けます。また、デシリアライザ(io.apicurio.registry.utils.serde.AvroKafkaDeserializer
)の設定も行います。Kafka の自動コミット(enable.auto.commit=false
)を無効にしています。これはリアクティブメッセージングがオフセットコミットを処理してくれるためです。
送信者と受信者が同じアプリケーション内に存在しています。そのため、同じチャネル名を使用することはできません。
ちょっとしたインフラ
アプリケーションを実行する前に必要なのは以下です。
- Kafka のブローカー
- Apicurio のスキーマレジストリ
プロジェクトのルートに以下の内容の docker-compose.yaml
ファイルを作成します。
version: '2' services: zookeeper: image: strimzi/kafka:0.11.3-kafka-2.1.0 command: [ "sh", "-c", "bin/zookeeper-server-start.sh config/zookeeper.properties" ] ports: - "2181:2181" environment: LOG_DIR: /tmp/logs kafka: image: strimzi/kafka:0.11.3-kafka-2.1.0 command: [ "sh", "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" ] depends_on: - zookeeper ports: - "9092:9092" environment: LOG_DIR: "/tmp/logs" KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 schema-registry: image: apicurio/apicurio-registry-mem:1.2.2.Final ports: - 8081:8080 depends_on: - kafka environment: QUARKUS_PROFILE: prod KAFKA_BOOTSTRAP_SERVERS: localhost:9092 APPLICATION_ID: registry_id APPLICATION_SERVER: localhost:9000
この docker-compose
ファイルは必要なものをすべて起動します。Apicurio のレジストリのプロパティが気になる方もいらっしゃるかもしれません。実は Apicurio のレジストリも Quarkus のアプリです。
実行する時間
では、始めましょう。まずは、インフラの整備を一緒に始めましょう。
docker-compose up -d
docker-compose down; docker-compose rm でインフラを停止します。 そして、アプリケーションを起動します。
mvn compile quarkus:dev
一度起動したら、別の端末を開いて映画を投稿します。
curl --header "Content-Type: application/json" \ --request POST \ --data '{"title":"The Shawshank Redemption","year":1994}' \ http://localhost:8080/movies curl --header "Content-Type: application/json" \ --request POST \ --data '{"title":"The Godfather","year":1972}' \ http://localhost:8080/movies curl --header "Content-Type: application/json" \ --request POST \ --data '{"title":"The Dark Knight","year":2008}' \ http://localhost:8080/movies curl --header "Content-Type: application/json" \ --request POST \ --data '{"title":"12 Angry Men","year":1957}' \ http://localhost:8080/movies
アプリケーションログでは、以下のように表示されているはずです。
2020-09-11 16:42:22,597 INFO [MovieResource] (executor-thread-1) Sending movie The Shawshank Redemption to Kafka 2020-09-11 16:42:22,619 INFO [MovieResource] (executor-thread-1) Sending movie The Godfather to Kafka 2020-09-11 16:42:22,624 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Shawshank Redemption (1994) 2020-09-11 16:42:22,641 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Godfather (1972) 2020-09-11 16:42:22,644 INFO [MovieResource] (executor-thread-1) Sending movie The Dark Knight to Kafka 2020-09-11 16:42:22,663 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Dark Knight (2008) 2020-09-11 16:42:22,669 INFO [MovieResource] (executor-thread-1) Sending movie 12 Angry Men to Kafka 2020-09-11 16:42:22,688 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: 12 Angry Men (1957)
結論
やりました!数行のコードと少しの設定で、Kafka、Avro、スキーマレジストリを Quarkus アプリケーションに統合できます。このデモのコードは https://github.com/cescoffier/quarkus-kafka-and-avro にあります。readme ファイルには、それを実行するための手順が記載されています。
Quarkus 1.9 では、メッセージングやリアクティブ全般についての新しい機能がたくさん追加されます。ご期待ください!