Quarkusとkafka、スキーマレジストリ、Avroの使い方

Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。

この記事は、Quarkus.io のブログ記事、How to Use Kafka, Schema Registry and Avro with Quarkus の翻訳記事です。


Kafka の世界では、Apache Avro は圧倒的に最も使用されているシリアライゼーションプロトコルです。Avro はデータシリアライズシステムです。Kafka と組み合わせることで、スキーマベースの堅牢で高速なバイナリシリアライズを実現します。

このブログ記事では、Quarkus アプリケーションでスキーマレジストリを使用して Avro を使用する方法を見ていきます。このブログでは、JVM モードを中心に紹介しています。ネイティブモードについては別の記事で取り上げます。

私たちは簡単なアプリケーションを書いていきます。それは HTTP リクエストを受信して、Kafka にペイロードを書き込んで、Kafka から読み出すというアプリです。簡単に言うと、同じアプリケーションが Kafka に書き込んでそこから読むということになります。ですが、明らかに現実世界では別のアプリケーションになるでしょう。

https://quarkus.io/assets/images/posts/kafka-avro/architecture.png

開始方法

では、最初から始めてみましょう。https://code.quarkus.io にアクセスしてプロジェクトを作成し、以下の拡張機能を選択します。

  • RESTEasy JSON-B
  • SmallRye Reactive Messaging - Kafka Connector
  • Apache Avro

https://quarkus.io/assets/images/posts/kafka-avro/project.png

プロジェクトをダウンロードして、お好きな IDE で開いてください。

生成された pom.xml にいくつかのコンテンツを追加する必要があります。 pom.xml ファイルを開き、以下のdependencyを追加します。

<dependency>
  <groupId>io.apicurio</groupId>
  <artifactId>apicurio-registry-utils-serde</artifactId>
  <version>1.2.2.Final</version>
  <exclusions>
    <exclusion>
      <groupId>org.jboss.spec.javax.interceptor</groupId>
      <artifactId>jboss-interceptors-api_1.2_spec</artifactId>
    </exclusion>
  </exclusions>
</dependency>

この依存関係は、Avro シリアライザとデシリアライザを提供します。このシリアライザ/デシリアライザには複数のバージョンがあります。このブログ記事では、Apicurio から提供されたものを使用しています。Confluent のものを使うこともできます(アーティファクトは Maven Central にはないので、追加のリポジトリを追加する必要があります)。

また、avro-maven-plugin を追加する必要があります。 <build><plugins>の下に、以下を追加します。

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.9.2</version>
  <executions>
    <execution>
    <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>src/main/avro</sourceDirectory>
        <outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
        <stringType>String</stringType>
      </configuration>
      </execution>
  </executions>
</plugin>

このプラグインは、src/main/avro ディレクトリにある Avro スキーマ ファイルからコードを生成します。この設定で、ようやくコードを書き始められます。

Avro スキーマ

まず、Kafka で読み書きするオブジェクトを表すスキーマを書く必要があります。以下の内容の src/main/avro/movie.avsc ファイルを作成します。

{
  "namespace": "me.escoffier.quarkus",
  "type": "record",
  "name": "Movie",
  "fields": [
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "year",
      "type": "int"
    }
  ]
}

pom.xml ファイルで設定された avro-maven-plugin は、titleyear の属性を持つ me.escoffier.quarkus.Movie クラスを生成します。クラスを生成するには、以下を実行します。

mvn generate-sources

Movie リソース

最初に書くクラスは HTTP リクエストを受け取り、(Movie の)データをKafkaに書き込みます。 src/main/java/me/escoffier/MovieResource.java を以下の内容で作成します。

package me.escoffier;

import me.escoffier.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/movies")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {

    private static final Logger LOGGER =
        Logger.getLogger("MovieResource");

    @Inject @Channel("movies") Emitter<Movie> emitter;


    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka",
            movie.getTitle()
        );
        emitter.send(movie);
        return Response.accepted().build();
    }

}

この JAX-RS リソースは簡単です。これは単一のエンドポイントメソッドを持ち、/movies で JSON ペイロードを受信します。RESTEasy は JSON ドキュメントを自動的に Movie オブジェクトにマッピングします。 avsc ファイルで説明されているように、期待される JSON には titleyear の 2 つのフィールドが含まれています。

リアクティブメッセージングで Quarkus を使用する場合、Kafka と直接やりとりすることはありません。あなたは Emitter を注入します。これはオブジェクト(私たちのmovie)をチャネルに送ります。アプリケーションの設定では、このチャネルを Kafka トピックにマッピングします。

設定といえば、src/main/resources/application.properties を開いて、以下を追加します。

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api

mp.messaging.outgoing.movies.connector=smallrye-kafka
mp.messaging.outgoing.movies.topic=movies
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider

この設定は少し説明が必要です。まず、mp.messaging.connector.smallrye-kafka.apicurio.registry.url でスキーマレジストリの URL を設定します。Apicurio の代わりに Confluent のシリアライザ/デシリアライザを使用した場合、mp.messaging.connector.smallrye-kafka.schema.registry.url という名前のプロパティです。

mp.messaging.outgoing.moviesmovies チャンネルを設定します。 connector 属性は、SmallRye Kafka コネクタがチャネルを管理していることを示します。 topic 属性はトピックの名前を指定します(これはチャンネル名と一致するので、この場合は省略できます) 。value.serializer は使用するシリアライザを設定します。ここでは、Apicurio が提供する io.apicurio.registry.utils.serde.AvroKafkaSerializer を使用します。 registry.* プロパティは、レジストリがスキーマをどのように扱うかを設定します。

Movie の使用

後半のアプリはさらにシンプルになっています。受信した映画をログに記録するだけです。

src/main/java/me/escoffier/MovieConsumer.java を以下の内容で作成します。

package me.escoffier;

import me.escoffier.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MovieConsumer {

    private static final Logger LOGGER =
        Logger.getLogger("MovieConsumer");

    @Incoming("movies-from-kafka")
    public void receive(Movie movie) {
        LOGGER.infof("Received movie: %s (%d)",
            movie.getTitle(), movie.getYear());
    }

}

@Incoming アノテーションは、movies-from-kafka チャネル上を遷移する各 Movie オブジェクトに対してメソッドが呼び出されることを示しています。今回はケースでは、単純にログメッセージを書きます。

もうすぐそこまで来ています。Kafka からの受信を設定する必要があります。 application.properties を再度開き、以下を追加します。

mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
mp.messaging.incoming.movies-from-kafka.topic=movies
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider

これらのプロパティは movies-from-kafkamovies Kafka トピックに対応付けます。また、デシリアライザ(io.apicurio.registry.utils.serde.AvroKafkaDeserializer)の設定も行います。Kafka の自動コミット(enable.auto.commit=false)を無効にしています。これはリアクティブメッセージングがオフセットコミットを処理してくれるためです。

送信者と受信者が同じアプリケーション内に存在しています。そのため、同じチャネル名を使用することはできません。

ちょっとしたインフラ

アプリケーションを実行する前に必要なのは以下です。

  • Kafka のブローカー
  • Apicurio のスキーマレジストリ

プロジェクトのルートに以下の内容の docker-compose.yaml ファイルを作成します。

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  schema-registry:
    image: apicurio/apicurio-registry-mem:1.2.2.Final
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod
      KAFKA_BOOTSTRAP_SERVERS: localhost:9092
      APPLICATION_ID: registry_id
      APPLICATION_SERVER: localhost:9000

この docker-compose ファイルは必要なものをすべて起動します。Apicurio のレジストリのプロパティが気になる方もいらっしゃるかもしれません。実は Apicurio のレジストリも Quarkus のアプリです。

実行する時間

では、始めましょう。まずは、インフラの整備を一緒に始めましょう。

docker-compose up -d

docker-compose down; docker-compose rm でインフラを停止します。 そして、アプリケーションを起動します。

mvn compile quarkus:dev

一度起動したら、別の端末を開いて映画を投稿します。

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Shawshank Redemption","year":1994}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Godfather","year":1972}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Dark Knight","year":2008}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"12 Angry Men","year":1957}' \
  http://localhost:8080/movies

アプリケーションログでは、以下のように表示されているはずです。

2020-09-11 16:42:22,597 INFO  [MovieResource] (executor-thread-1) Sending movie The Shawshank Redemption to Kafka
2020-09-11 16:42:22,619 INFO  [MovieResource] (executor-thread-1) Sending movie The Godfather to Kafka
2020-09-11 16:42:22,624 INFO  [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Shawshank Redemption (1994)
2020-09-11 16:42:22,641 INFO  [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Godfather (1972)
2020-09-11 16:42:22,644 INFO  [MovieResource] (executor-thread-1) Sending movie The Dark Knight to Kafka
2020-09-11 16:42:22,663 INFO  [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Dark Knight (2008)
2020-09-11 16:42:22,669 INFO  [MovieResource] (executor-thread-1) Sending movie 12 Angry Men to Kafka
2020-09-11 16:42:22,688 INFO  [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: 12 Angry Men (1957)

結論

やりました!数行のコードと少しの設定で、Kafka、Avro、スキーマレジストリを Quarkus アプリケーションに統合できます。このデモのコードは https://github.com/cescoffier/quarkus-kafka-and-avro にあります。readme ファイルには、それを実行するための手順が記載されています。

Quarkus 1.9 では、メッセージングやリアクティブ全般についての新しい機能がたくさん追加されます。ご期待ください!

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。