Mutiny でページ分割されたAPIを扱う

Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。

この記事は、Quarkus.io のブログ記事、Handling paginated APIs with Mutiny の翻訳記事です。


Mutiny の冒険の最初の方で、友人のアレックスが面白い問題を抱えてやってきました。アレックスは、REST サービスからリアクティブ的にデータを取得したいと考えていました。これまでのところ、問題はありません。このために必要なものはすべて私たちの道具箱に入っています。しかし、このサービスは、多くのサービスと同様に、ページ分割を利用しています。ああ!そうすると少し辛くなるんですよね。アレックスは全てのアイテムを取得して、ストリームとして処理したかったのです。ですが、一括では取得できません。ページごとにサービスを起動し、アイテムを抽出して、ストリームに入力する必要があります。

では、どうやってリアクティブに実現し、正気を失わずに適切なアイテムのストリームを構築するのでしょうか?見てみましょう!

Punk API

まずはAPIが必要です。アレックスが紹介してくれたのは、ビールを取得するための REST API である Punk API です。それはそれで楽しいし、さらに良いことに、ページ分割を使用しています。APIを手に入れた!

https://api.punkapi.com/v2/beers?page=1 を呼び出すと、以下のようなJSON 配列が得られます。

[
  { 
    first beer 
  }, 
  { 
    second beer 
  }, 
 // ... 
] 

私は、各オブジェクトの内容を議論することはしません。ですが、ドキュメントのページでは、そのことについて素晴らしい仕事をしています。ページ分割の側面に注目してみましょう。最初に、page クエリパラメータを渡しました。これはどのページが欲しいかを示します。一般的に、ページを取得する際に、API は次のページがあるかどうかを知る方法を提供しています(JSONドキュメント内の特別なフィールドやHTTPヘッダ)。ですが、Punk API ではヒントを提供していません。だから、すべてのビールを取得するには、1,2,3.. ページ目のサービスを呼び出す必要があります。 これは、返されたJSON 配列が空になるまで呼び出します。

命令的な世界では、すべてのビールを取得するために、あなたはこのようなことをするでしょう。

List<Beer> beers = ...; 
int page = 1; 
List<Beer> batch = ... 
do { 
  batch= getBeersFromPage(page); 
  beers.addAll(batch); 
  page = page + 1; 
} while (! batch.isEmpty()); 

リアクティブで同じことを実現し、ビールのストリームを作るにはどうすれば良いでしょうか?

https://quarkus.io/assets/images/posts/mutiny-pagination/mutiny-pagination.png

一歩ずつ進めていきましょう。

単一ページの取得

まずは、1つのページを取得する方法を見てみましょう。今回は Vert.x Web Client を使いますが、Mutiny API を提供しているリアクティブな HTTP クライアントであれば何でも使えます。

// クライアントを作成 
WebClient client = WebClient.create(vertx, new WebClientOptions() 
  .setDefaultHost("api.punkapi.com") 
  .setDefaultPort(443) 
  .setSsl(true)
);

// 最初のページを取得 
Uni<List<Beer>> uni = client.get("/v2/beers?page=1") 
  .send() 
  .onItem()
  .transform(Pagination::toListOfBeer); 

このスニペットは Web クライアントを作成します。そして、そのクライアントを使って、最初のページを取得します。

結果(onItem)を受け取ると、JSON 配列をビールのリストに変換します。

このコードをメソッドで抽出して、ページ番号をパラメータにしてみましょう。

private static Uni<List<Beer>> getPage(WebClient client, int page) { 
  return client.get("/v2/beers?page=" + page) 
    .send() 
    .onItem()
    .transform(Pagination::toListOfBeer); 
} 

ここまでは順調です。

複数のページを取得

というわけで、1 ページを取得してそこから項目を抽出する方法がわかりました。この操作をページごとに繰り返し、ストリームを提供するだけです。

Mutiny はUniを複数回繰り返すことで Multi を作成するメソッドを提供しています。その内部では、Uni を返すメソッドを呼び出して、その上でサブスクライブしています。しかし、私たちは進みながら、今のページを受け渡す必要があります。Mutiny は、Uni を作成するメソッドにページ番号をインクリメントさせるために、状態を保存する可能性を提供します。

Multi.createBy()
  .repeating()
  .uni(AtomicInteger::new, page -> getPage(client, page.incrementAndGet()) )

上記のコードでは、getPage メソッドで返された Unis によって、出されたアイテムを持つストリームを作成しています。ページ番号(AtomicIntegerに格納されている)を毎回インクリメントしています。だから、それは1、2、3... ページを取得します。そして、毎回受信したList<Beer> を下流に放出します。

しかし、どこかの時点で止めなければなりません。先ほども言いましたが、返されたリストが空になったときに停止できます。

Multi<List<Beer>> multi = 
  Multi.createBy()
    .repeating()
    .uni(AtomicInteger::new, page -> getPage(client, page.incrementAndGet()))
    .until(List::isEmpty);

until 句は、いつ反復を停止しなければならないかを示します。取得したリスト(getPageで生成されたリスト)を受け取り、このリストが空の場合は繰り返しを停止します。リストにビールが含まれている場合は、次のページを取得します。

ビールの開梱

私たちは今、リストのストリームを持っており、各リストにはビールのセットが含まれています。もうすぐだが、アレックスがビールのストリームを欲しがっている。だから、ビールを開梱しないといけない。

これを実現するための最初のアプローチは、transformToMultiAndConcatenate を使用します。つまり、各リストに含まれるビールを含む新しい multi を作成し、これらのmultis連結します。

Multi<Beer> multi = 
  Multi.createBy().repeating()
    .uni(AtomicInteger::new, page -> getPage(client, page.incrementAndGet()))
    .until(List::isEmpty)
    .onItem()
    .transformToMultiAndConcatenate(l -> Multi.createFrom().iterable(l));

連結について疑問に思ったことはありますか? 他のブログ記事もチェックしてみてください

https://quarkus.io/assets/images/posts/mutiny-pagination/disjoint.png

これは一般的な操作なので、Mutiny は全く同じことを行う disjoint メソッドを提供します。

Multi<Beer> multi = 
  Multi.createBy().repeating()
    .uni(AtomicInteger::new, page -> getPage(client, page.incrementAndGet()))
    .until(List::isEmpty)
    .onItem()
    .disjoint(); 

これで終わりです!

リアクティブのメリット

私たちは私たちのストリームを持っています、今はそれを使用する時間です!それでは例として、説明文に「IPA」(流行に乗ろう)と書かれているビールの最初の 10 種類を検索してみましょう。

multi.transform()
  .byFilteringItemsWith(beer -> beer.description.contains("IPA"))
  .transform()
  .byTakingFirstItems(10);

私たちのストリームの利点は、すべてのページを取得しないことです。十分なビールが手に入るととすぐにその繰り返しを止めます。どうやって?なぜならば、上流にアイテムを増やす必要がないことを伝え(キャンセル)、その繰り返しを止めるからです。このようにしてページ分割された API からアイテムを取得することで、リクエストの数を減らせます。その結果としてリモートサービスの負荷を減らせます。

喉が渇きました?

このコードを試してみたい方は、この gist をチェックしてみてください。jbang ですぐに実行できます。

jbang https://gist.github.com/cescoffier/18a326a5c057392bec54d95ec5a06ca6

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。