エムオーテックス株式会社が運営するテックブログです。

RxJSでAPIの非同期実行を工夫した話

はじめに

こんにちは、フロントエンド開発チームの南です。
LANSCOPEエンドポイントマネージャークラウド版(以下、LANSCOPE クラウド版)では、フロントエンド開発のフレームワークにAngularを採用しています。
今回は、Angularで採用されているRxJSを使って非同期通信処理を工夫した点についてご紹介します。

RxJSとは

RxJSとは JavaScript用のリアクティブプログラミングライブラリです。
非同期通信処理やそのレスポンスを受け取って処理するコールバック処理がスッキリ書けます。
また、RxJSのオペレータを使うことで、複数のAPIリクエストを直列/並列に実行したり、レスポンスを加工できるメリットがあります。

RxJSで工夫した点について

LANSCOPE クラウド版はマイクロサービス思考のアーキテクチャとなっており、複数のAPIから取得した情報を組み合わせて一つの画面に表示することがあります。
表示に必要な全てのデータを取得してから画面を表示する仕様だと、取得するデータ量が多い場合、画面表示までに時間が掛かります。
そのため、一度に全てのデータを取得するのではなく、一回で取得できるサイズを指定し、分割して取得することがあります。
このような処理を愚直に書いた場合、コードが複雑になり可読性が落ちます。
そこで、RxJSを使って実際にLANSCOPE クラウド版のSPAにも実装している工夫を次にご紹介します。

※ 載せているコードはサンプルコードにしています。
※ LANSCOPE クラウド版の画面キャプチャはデモデータを表示しています。

ケース1

ケース1は以下のようなリスト系の画面に表示するデータを取得するケースです。

リストに表示するデータの最大数は数万程度を想定していることがあり、一回のリクエストで取得することが難しいため、一度に取得する件数を1000件程度にしていることが多いです。

この処理のポイントはexpandを記述している1行となります。

findList(unsubscribe$: Subject<void>, id: string, pageNo: number, pageSize: number = 1000): Observable<ExampleList> {
  return this._findList(unsubscribe$, id, pageNo, pageSize).pipe(
    expand(v => pageNo < v.last_page_no ? this._findList(unsubscribe$, id, v.pageNo + 1, pageSize) : EMPTY),
  );
}

private _findList(unsubscribe$: Subject<void>, id: string, pageNo: number, pageSize: number = 1000): Observable<ExampleList> {
  const bodyObj = {
    page_no: pageNo,
    page_size: pageSize
  };
  const body = jQuery.param(bodyObj);
  const url = `${this.baseUrl}/xxx/${id}/hoge?${body}`;
  return this.http.get<ExampleList>(url).pipe(
    map(res => res.body),
    mergeMap(x => this.addInfo(x)),
    takeUntil(unsubscribe$)
  );
}

上の処理を図解すると以下のようになります。

リスト画面に遷移すると、まず1000件を取得し、その1000件に付随する情報を別API(addInfo関数)で取得します。
そこまでデータの取得が完了したら、その時点で取得したデータをリストに表示すると同時に裏側ではpage_noをインクリメントし、page_no: 2として1001件〜2000件目のデータを取得します。
こうすることで、データの総量が多い場合でも、一定件数(page_size)のデータを取得して表示しつつ、裏側ではデータを取得し続けることができます。

APIのレスポンスに最後のページ番号(last_page_no)が含まれるようになっているため、
last_page_no > page_no の間はリクエストを続け、値が等しくなったら EMPTY を返すことで非同期処理が完了します。

ケース2

ケース2は設定系の一覧画面で削除する対象の設定にチェックを入れて、チェックした設定を一括で削除するケースです。
例えば、以下のような画面でチェックした設定を一括で削除することができます。

APIは設定一つだけを削除する仕様となっていることが多いため、チェックした件数分APIを実行する必要があります。

delete<T>(ids: T[], f: (id: T) => Observable<DeleteResponse>, concurrent = 100): Observable<DeleteResponse[]> {
  return from(ids).pipe(
    mergeMap(id => f(id), concurrent),
    toArray()
  );
}

単純にチェックした件数全てに対してAPIを並列実行してしまうと、例えば1000件程度並列に実行されて負荷が高くなってしまうため、ここでは並列数をconcurrentという引数に指定し、mergeMapの第二引数に指定することで並列数を制限するようにしています。
f: (id: T) => Observable<DeleteResponse>には設定1件を削除する処理を指定します。
また、全てのAPI実行が完了した後に削除完了のメッセージを画面に表示する必要があり、toArrayとすることでAPIの実行が全て完了した後にレスポンスを配列にまとめて返すようになっています。

上の処理を図解して処理の流れを追っていきます。

設定1000件を一括で削除を実行すると、まず初めに先頭の100件に対して削除のAPIが並列実行されます。

設定1を削除するAPIの実行が完了すると、その時点で並列数が99となり空きが一つ出たため、設定101を削除するAPIが実行されます。

次に設定2の削除が完了すると、また並列数が99となるため設定102を削除するAPIが実行されます。

上の図では例として設定1 -> 設定2の順番で削除が完了するようになっていますが、
設定1 〜 100を削除するAPIを並列で実行しているため、実行完了する順番は順不同となります。

無事1000件全ての削除に成功したら、コンポーネントにレスポンス(DeleteResponse[])が返され、コンポーネントは削除完了のメッセージを表示することができます。

おわりに

今回はAPIの実行にフォーカスした話でしたが、それ以外でもイベント伝達などでRxJSは広く使われています。
また、今回ご紹介したのはRxJSのほんの一部のオペレーターを使った処理であるため、これら以外にどのようなものがあるのか、もし興味があれば勉強してみてはいかがでしょうか。