Mutiny との同時並行な非同期アクション

Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。

この記事は、Quarkus.io のブログ記事、Concurrent asynchronous actions with Mutiny の翻訳記事です。


今週は、並行処理を中心とした広く普及しているユースケースについて聞かれました。このユーザーは、2つのマイクロサービスを並行して呼び出し、両方の結果が出たらそれらを結合して処理を続けたいと考えていました。基本的には以下のパターンです。

https://quarkus.io/assets/images/posts/mutiny-concurrent-uni/pattern.png

非リアクティブなアプローチでは、両方の呼び出しは呼び出し元のスレッドをブロックし、ワーカースレッドプールを使用している場合を除き、呼び出しは同時進行ではありません。ワーカースレッドプールを使用していても、これらのスレッドはブロックされている可能性が高く、無駄にリソースを消費してしまいます。

しかし、心配はありません。Quarkus のリアクティブと Mutiny は、このシナリオを処理するためのすべてを持っています。

2つのサービスを呼びだそう

今回の記事では、Vert.x Web Client というリアクティブな HTTP クライアントを使ってみます。ノンブロッキング I/O を活用して、高いパフォーマンスと真のノンブロッキングを実現しています。隠れたスレッドプールには依存しません。Quarkus Rest Client を使うこともできます。しかし、今のところワーカースレッドを使っています。

どのクライアントを使うにしても、呼び出すためにいくつかのリモートサービスが必要です。使ってみましょう。

まず最初に、私たちの見積りを取得するために必要なコードを見てみましょう。どちらのサービスも似たようなものではありますが、対応の構造が少し異なります。結局はこれで終わりですね。

private static Uni<String> getProgrammingQuote(WebClient client) {
    return client.getAbs(PROGRAMMING_QUOTE)
            .as(BodyCodec.jsonObject())
            .send()
            .onItem().transform(r -> r.body().getString("en") + " (" + r.body().getString("author") + ")");
}

private static Uni<String> getChuckNorrisQuote(WebClient client) {
    return client.getAbs(CHUCK_NORRIS_QUOTE)
            .as(BodyCodec.jsonObject())
            .send()
            .onItem().transform(r -> r.body().getString("value"));
}

これら2つのメソッドは、WebClient を受け取ります。そして、サービスを呼び出し、JSON レスポンスを取得し、それを抽出します。これらは両方とも Uni を返す。そう、これらは非同期なんです。その結果(見積書)は、利用可能な場合には後ほど提供されます。また、Uni を返すということは、返された Uni を誰かがサブスクライブした時にのみサービスが起動するということです。複数回サブスクライブしている場合は、複数回に分けて呼出をすることになります。

Unis の組み合わせ

今のところ、サービスを呼び出す方法は2つあります。しかし、上の図のように同時に並行に呼びたいです。

Mutiny では、Unis が生産したアイテムを組み合わせる方法を提供しています。

Uni<Tuple2<String, String>> tuple = Uni.combine().all()
    .unis(getProgrammingQuote(client), getChuckNorrisQuote(client))
    .asTuple();

誰かが Uni tuple をサブスクライブすると、サービスを呼び出す getProgrammingQuote(client)getChuckNorrisQuote(client) Unis を購読します。そのため、それらのリクエストが放出されます。サービスが並行して呼び出されます。

両方の返答がある場合は、複数の項目を運ぶシンプルな構造体であるタプルに結合します。

つまり、私たちのサービスを同時に呼び出すことは、非常に簡単なことです。実現したいサービスや非同期アクションを表す Unis を作成します。そして、Uni.combine().all() を使用してそれらを結合するだけです。 タプルを使用して結果を結合するか、combinator 関数を使用して結合するかを決めることができます。

すべてをまとめる

// Web Client を作成
WebClient client = WebClient.create(vertx);

// 2つのUnisの結果をタプルに結合
Uni.combine().all()
        .unis(getProgrammingQuote(client), getChuckNorrisQuote(client))
        .asTuple()

        // サブスクライブ(これが呼び出しのトリガーとなる)
        .subscribe().with(tuple -> {
    System.out.println("Programming Quote: " + tuple.getItem1());
    System.out.println("Chuck Norris Quote: " + tuple.getItem2());
});

それだ!このコードが実際に動作しているのを見たい場合は、この gist を確認してください。JBangで直接実行できます。

jbang https://gist.github.com/cescoffier/1ed68bef12b798529e10350f77686e9a

楽しもう

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。