Mutinyで失敗した時の対処法

Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。

この記事は、Quarkus.io のブログ記事、How to handle failures with Mutiny の翻訳記事です。

先週、Mutiny での障害処理についていくつか質問を受けました。だから、もう少し説明が必要なのかもしれない。

失敗はイベント

まず、Mutiny はイベント駆動型のリアクティブプログラミングライブラリです。Mutiny では、イベントを処理します。上流の UniMulti はこれらのイベントを伝播し、あなたはそれらを処理できます。これらのイベントは、アイテム、完了、キャンセルと... 失敗があります

Multi.createFrom().range(0, 10)
    .onItem().invoke(i -> System.out.println("Received item " + i))
    .onCompletion().invoke(() -> System.out.println("We are done!"))
    .onCancellation().invoke(() -> System.out.println(
        "The downstream does not want our items anymore!")
    )

    .onFailure().invoke(t -> System.out.println(
        "Oh no! We received a failure: " + t.getMessage())
    )

失敗を受けた時の対処法は?

先ほどのスニペットのようにアクションを呼び出す以外にも、失敗を受けたときにできることは複数あります。

よくあるのはリカバリすることです。あなたは、特定のアイテムを渡すか、別の Uni でリカバリできます。

upstream
    .onFailure().recoverWithItem(failure -> "hello (fallback)")
    .subscribe().with(i -> System.out.println("Received: " + i));

upstream .onFailure().recoverWithUni(failure -> getAnotherUni(failure)) .subscribe().with(i -> System.out.println("Received: " + i)); 

Multi では、あなたは別の Multi を提供したり、ストリームを完成させたりすることでリカバリできます。

upstream
  .onFailure().recoverWithCompletion();

もしあなたがシステムを信頼していれば、リトライする事もできます。注意点としては、まず安全にリトライできるかどうかを確認する必要があります

upstream
      .onFailure().retry()
        .withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10)).atMost(10)
      .subscribe().with(i -> System.out.println("Received: " + i));

あなたは失敗を変換することもできます。例えば、低レベルの失敗をよりビジネスに適したものにマッピングできます。2 番目の障害を下流に伝播し、低レベルの障害を隠します。

Uni.createFrom().failure(new IOException("boom"))
      .onFailure().transform(t -> new BusinessException(t))

失敗は終端

失敗は終端イベントです。上流で失敗が伝播した場合は、正常に動作できないことを意味します。Uni の場合は、アイテムを持っていても失敗しても構わないので、それはそれで問題ありません。でも Multi の場合はちょっと複雑です。

リカバリしたとしても、失敗を処理することで、残りの流れを手に入れられません。あなたの上流は... 壊れています

以下のコードを見てみましょう。

List<String> list = Multi.createFrom().range(0, 10)
      .onItem().invoke(v -> {
              if (v == 7) {
                throw new IllegalArgumentException("We don't like seven!");
              }
      })
      .onFailure().recoverWithItem(7)
      .map(integer -> integer.toString())
      .onItem().invoke(s -> System.out.println(s))
      .collectItems().asList()
      .await().indefinitely();

これは、[1,2,3,4,5,6,7] を生成します。そして、残りの流れを生成しません。7onItem().invoke() ステージが呼び出されると、失敗が発生します。それが流れを止めます。上流からそれ以上のアイテムを処理することはありません。

では、どうすればいいのか?隔離しましょう!

ステージが失敗を送信すると、ストリームを終了する失敗を送信します。そして、上流への購読をキャンセルします(正常に動作していないため、アイテムが足りないことを通知します)。そのため、上流から他のアイテムを処理し続ける必要がある場合は、その失敗を隔離して、上流への購読をキャンセルしないようにすればいいだけです。

これを達成するもっとも一般的なアプローチは以下です。

List<String> list = Multi.createFrom().range(0, 10)
    .onItem().transformToUniAndConcatenate(i ->
            // Isolate the failure in this block
            Uni.createFrom().item(i)
                    .onItem().invoke(v -> {
                        if (v == 7) {
                            throw new IllegalArgumentException("We don't like seven!");
                        }
                    })
                    .onFailure().recoverWithItem(7)
    )
    .map(integer -> integer.toString())
    .onItem().invoke(s -> System.out.println(s))
    .collectItems().asList()
    .await().indefinitely();

基本的には、失敗する可能性のある操作を隔離します。もし失敗したら、リカバリーします。しかし、キャンセルはそのアイテムをキャンセルするだけで、完全なストリームはしません。それは次の 1 つのアイテムなどを受け取ることを意味します。このコードは、期待されるリストを生成します。

まとめ

これで失敗を処理して、優雅に流れを続けることができるようになりました。

Mutiny についてもっと詳しく知りたい方は、以下の動画をチェックしてみてください。

youtu.be

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。