Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。
この記事は、Quarkus.io のブログ記事、Combining Apache Kafka and the Rest client の翻訳記事です。
今週もまた面白い質問がありました。今週は、Kafka と Rest Client の組み合わせについて誰かに聞かれました。それは繰り返しになりますが、ほとんどの場合、次のようなプロセスを目標にしています。
つまり、受信した Kafka メッセージごとにリモートサービスを呼び出したいのです。そのため、読み取りしているデータを含む最初のトピック(「in」)、例えば「トランザクション」があります。次に、アーキテクチャの中心的な部分があります。それは処理コンポーネント(図のProcessing Compornent)です。これは、着信したトランザクションを読み取り、それらのそれぞれについて、リモートサービスを呼び出します。また、それは(リモートサービスによって生成された)レスポンスを別の Kafka トピック "out" に書き込みます。
Quarkus でこれを実装するのは簡単です。この記事ではそれについて説明します。Reactive Messaging と Rest Client のおかげで、これには 20 行以上のコードを必要としません。
リモートサービス
まずはリモートサービスから始めてみましょう。Quarkus には、リモート HTTP サービスを呼び出すための複数の方法があります。ですが、HTTP の低レベルの詳細を処理することなく HTTP サービスと対話する優れた方法を提供しているので、Rest Client を使用してみましょう。
みなさんは HTTP API を使うこともできます。ですが、シンプルに言うと、簡単なリモートサービスを考えてみましょう。次のようになります。
@RegisterRestClient(configKey = "transaction-service") @Produces(MediaType.APPLICATION\_JSON) @Consumes(MediaType.APPLICATION\_JSON) public interface TransactionService { @Path("/transactions") @POST TransactionResult postSync(Transaction transaction); @Path("/transactions") @POST Uni<TransactionResult> postAsync(Transaction transaction); }
このサービスには、同じ HTTP エンドポイントを呼び出す 2 つのメソッドが含まれています。最初のものは同期的なものです。そのため、応答を受け取るまで呼び出し元のスレッドをブロックします。2つ目は非同期です。返された Uni
は受信時にレスポンスを取得します。この場合、呼び出し元のスレッドはブロックされません。そのため、他のことができます。これらのメソッドの使い方は後ほど見ていきます。まずは、その前に少し設定を見ましょう。 application.properties
に追加します。
# Configure the transaction-service (rest client) transaction-service/mp-rest/url=http://localhost:8080
もちろん、URLを更新します。https://quarkus.io/guides/rest-client では、Rest Client の使用方法や設定について詳しく説明しています。
着信トランザクションごとにサービスを呼び出す
OK、私たちはサービスを呼び出すことはできます。ですが、覚えておいて下さい。私たちはそれを着信トランザクションごとに呼び出したいのです。そしてこれらのトランザクションは Kafka のトピックから来ます。Reactive Messaging を使えば、今すぐに Kafka に対処する必要はありません。私たちはロジックに集中できます。私たちはチャネル(データのストリーム)を持っているとしましょう。これはトランザクションを転送するための物です。これを最初のチャンネル in
と呼びます。
また、私たちはリモートサービスからのレスポンスを別の Kafka トピックに書き込みたいと思います。再度言いますが、Kafka に対処する必要はありません。レスポンスを out
という名前のチャンネルに書き込んだとします。
そこで、以下のような(不完全な)コードがあります:
@ApplicationScoped public class TransactionProcessor { @Incoming("in") // 最初のチャネル - 私たちはそこから読み込む @Outgoing("out") // 2 つ目のチャネル - 私たちはそこへ書込む public TransactionResult sendToTransactionService(Transaction transaction) { // ここで私たちのサービスを読み出す必要がある } }
@Incoming
は読み込みチャンネルを設定します。@Outgoing
は書き込みチャンネルを設定します。しかし、何かが足りないです...。 私たちはリモートサービスを呼び出す必要があります
@ApplicationScoped public class TransactionProcessor { private static final Logger LOGGER = Logger.getLogger("TransactionProcessor"); @Inject @RestClient TransactionService service; @Incoming("in") @Outgoing("out") @Blocking public TransactionResult sendToTransactionService(Transaction transaction) { LOGGER.infof("Sending %s transaction service", transaction); return service.postSync(transaction); } }
まず最初に、Rest Client を注入します。そして、私たちのメソッドの中でそれを呼び出すだけです。
@Blocking
について疑問に思うことがあるかもしれません。リアクティブメッセージングでは、ブロッキングコードを使用している場合はその旨を示す必要があります。デフォルトではそれはイベントループアーキテクチャが使用されているためです。便利な反面、@Blocking
を乱用すべきではありません。それは、スレッドプールに依存して同時実行性が制限されるためです。しかし、それはあなたのロジックを同期させます。
非同期操作の使用
TransactionService
が実行する 2 番目のメソッド postAsync
を使用することで、私たちは @Blocking
アノテーションを取り除けます。
@ApplicationScoped public class TransactionProcessor { private static final Logger LOGGER = Logger.getLogger("TransactionProcessor"); @Inject @RestClient TransactionService service; @Incoming("in") @Outgoing("out") public Uni<TransactionResult> sendToTransactionService(Transaction transaction) { LOGGER.infof("Sending %s transaction service", transaction); return service.postAsync(transaction); } }
post
メソッドの async バリアントを使用すると、@Blocking
を削除できます。 Uni
を直接返します。その Uni
はリモートサービスの応答を受信すると、その値を out
チャネルに書き込みます。
チャネルを Kafka にマッピング
ここまでは順調です。私たちのコードを Kafka に接続する時が来ました。リアクティブメッセージングでは、チャネルをコネクタにマッピングします。ここでは Kafka です。そこで、in
チャンネルと out
チャンネルが Kafka のトピックであることを示すようにアプリケーションを設定する必要があります。もう一度、application.properties
ファイルを編集し、追加します。
mp.messaging.incoming.in.connector=smallrye-kafka mp.messaging.incoming.in.topic=transactions mp.messaging.incoming.in.value.deserializer=org.acme.model.TransactionDeserializer mp.messaging.incoming.in.auto.offset.reset=earliest mp.messaging.incoming.in.enable.auto.commit=false mp.messaging.outgoing.out.connector=smallrye-kafka mp.messaging.outgoing.out.topic=output mp.messaging.outgoing.out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
最初のブロックは in
チャンネルについてです。 それは transactions
Kafkaのトピックに接続します。データはカスタムデシリアライザでデシリアライズされます。最後の他のプロパティは、オートコミット(Reactive Messaging が代わりにコミットを処理してくれます)を無効にします。そして、最後にコミットされたオフセット以降のデータを読み込みます。
2 番目のブロックでは、out
チャネルを設定します。 output
Kafkaトピックと接続します。そして、値のシリアライザを設定します。この簡単な例では、データをJSONで書きます。
トランザクションが Kafka transaction
トピックに書き込まれると、それは私たちの処理コンポーネントで受信されます。そして、リモートサービスに送信され、その結果が output
Kafka トピックに書き込まれます。
2020-08-27 10:04:44,141 INFO [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='MacroHard', amount=20} transaction service 2020-08-27 10:04:44,196 INFO [TransactionResource] (executor-thread-2) Handling transaction MacroHard / 20 2020-08-27 10:04:44,240 INFO [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='BlueHat', amount=10} transaction service 2020-08-27 10:04:44,245 INFO [TransactionResource] (executor-thread-2) Handling transaction BlueHat / 10
output
トピックの中を見てみると、TransactionResult
が流れているのが確認できます。
終わりました!
数行のコードと少しの設定で、Kafka トピックからデータを読み取り、リモートサービスを呼び出し、その結果を別の Kafka トピックに書き込めます。単純な話です。
自分でやってみたいと思いますか?この GitHub リポジトリのコードをチェックアウトして、readme の指示に従ってください。
Reactive Messaging と Rest クライアントには他にも宝石が含まれています。関連するガイドやドキュメントを確認して詳細を確認してください。