Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。
この記事は、Quarkus.io のブログ記事、Emitter - Bridging the imperative and the reactive worlds の翻訳記事です。
Kafka と Avro についての以前のブログ記事(原文)では、エミッタ(放出するもの)を使って Kafka のメッセージを送信していました。
この記事では、このエミッタの構造をもう少し詳しく見ていきます。
エミッタの注入
エミッタの注入は簡単です。ターゲットとするチャンネルを指定します。つまりどこにメッセージを送信するかです。
@Inject @Channel("movies") Emitter<Movie> emitter;
リアクティブメッセージングは、主要な抽象化としてチャネルを使用することを覚えておいてください。これらは、インメモリチャネルであったり、リモートブローカにマッピングされていたりします。
先ほどのコードスニペットでは、Emitter<Movie>
を注入しています。ペイロードとして movies を含むメッセージを送信することを意味します。そのため、指定された型はペイロードの型となります。これにより、ペイロードを直接 (メッセージ内で自動的にラップして) 送ったり、ペイロードとして movie を含むより詳細なメッセージを送れます。
Movie movie = ... // ペイロードを直接送る emitter.send(movie); // メッセージを送る emitter.send(Message.of(movie));
ペイロードの送信
ペイロードを送信することは、データを送信する最もシンプルな方法です。ペイロードを Movie
のインスタンスのようにsend
メソッドに渡すだけです。これは、ペイロードをラッピングしたシンプルな Message
を作成するだけです。
ペイロードと共に使用される場合、その send
メソッドはメッセージの処理が成功したか失敗したかを示す CompletionStage
を返します。
emitter.send(movie) .whenComplete((success, failure) -> { if (failure != null) { System.out.println("D'oh! " + failure.getMessage()); } else { System.out.println("Message processed successfully"); } });
処理していると、後でイベントの排出を見ることになりますが、これは非同期的に発生しています。そのため、返された CompletionStage
は、メッセージがいつ処理されたかを知れます。メッセージが確認されると、CompletionStage
は正常に完了します。ほとんどの場合、スムーズに処理が完了しているか、無事にブローカーにメッセージが送られていることを意味しています。何かあった場合、例外的に CompletionStage
が完了します。渡された例外を見れば、その理由がわかります。
メッセージの送信
ペイロードの送信はより簡単ですが、Kafka での書き込みの設定やトレース情報などのメタデータをメッセージに添付したい場合もあるでしょう。エミッタはメッセージの送信も可能です。なので、必要なメタデータを添付します。以下の例では、アウトバウンドの Kafka レコードを設定します。キーやトピックなどを設定します。そうすれば、異なるトピックにメッセージを派遣したり、動的に決定できます。
OutgoingKafkaRecordMetadata<?> metadata = OutgoingKafkaRecordMetadata.builder()
.withTopic("movies")
.withKey(movie.getYear())
.build();
emitter.send(Message.of(movie).addMetadata(metadata));
放出は非同期
エミッタは、命令的な世界とリアクティブな世界の間の架け橋となります。メッセージを発した場合、このメッセージはすぐには処理されません。メッセージを使用する下流のコンポーネントは、Reactive Streams の一部です。メッセージをすぐに渡すことは、Reactive Streams プロトコルに違反します。下流のコンポーネントがこのメッセージを受け入れる準備ができていることを確認しなければなりません。その結果、エミッタはメッセージを直接プッシュするのではありません。下流のキャパシティ(Reactive Streams の専門用語でのリクエスト)を処理するために使用されるバッファにそれをエンキューしています。
下流のコンポーネントは、リクエストに応じてメッセージを受信します。そのコンポーネントはその容量を超えることがないようにします。
オーバーフロー管理
しかし、バッファは... オーバーフローします。メッセージを出しすぎて下流が追いつかない場合、それらのメッセージは最大容量に達するまでバッファに保存されます。そうすると、これ以上発せられなくなり、発しようとすると例外が発生します。しかし、この場合はどうすればいいのでしょうか?エミッタを注入する際には、オーバーフロー戦略を設定できます。例えば、バッファサイズを設定したり、制限されないバッファを使用したり、メッセージを捨てたり、失敗したり、バックプレッシャーを無視して下流に任せたりなどできます。デフォルトではバッファを使用します。ですが、ユースケースによっては異なる設定が必要になるかもしれません。
@Inject @Channel("movies") @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 1000) Emitter<Movie> emitter1; @Inject @Channel("movies") @OnOverflow(value = OnOverflow.Strategy.NONE) Emitter<Movie> emitter2; @Inject @Channel("movies") @OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER) Emitter<Movie> emitter3;
結論
この記事では、Reactive Messaging の Emitter
の構成について簡単に紹介します。詳細については、SmallRye Reactive Messaging のドキュメントを参照してください。
次の Quarkus バージョン(1.9)では、この機能は2つの非常に優れた機能強化によって改善されます。まずはじめに、それは Mutiny API との統合を容易にする Mutiny とわずかに異なる物を提供します。そうすれば、Kafka の場合はメタデータを使わずに直接キーと値のペアを出力できるようになります。
ご期待ください!これらをフォローアップ記事でカバーします!