- はじめに
- 内容
Cloud Functions を利用して BigQuery へデータを読み込む方法をいくつか試す機会がありました。BigQuery でのデータ読み込みの方法として、バッチ処理とストリーミング処理の2つがあります。バッチ処理、ストリーミング処理の特性と、それらを利用したサービスについて、どのように選択すべきかを考えていきたいと思います。なお、この記事ではストリーミング処理の中でも、Cloud Data Fusion、Dataflow などのパイプラインを組む場合などに有効なサービスではなく、データを読み込む部分に焦点を当てています。
- 内容
- バッチ処理の概要
- バッチ処理の定義と特徴
バッチ処理は BigQuery の読み込みジョブを利用するもので、ニアリアルタイムでデータ読み込みが不要な場合に有効です。Google Cloud のドキュメントにも記載がありますが、利用は以下の方法で行います。- bq load コマンド(それ相当の API)
- SQL
- BigQuery Data Transfer Service
- Storage Write API(ストリーミングとしても利用可能で詳細はドキュメント参照)
- バッチ処理のメリット
- 処理の内容次第ではありますがコストが低くなることが多い
- バッチ処理のデメリット
- loadjob 制限がある
- バッチ処理の定義と特徴
- ストリーミング処理の概要
- ストリーミング処理の定義と特徴
ストリーミング処理はリアルタイムでデータを読み込みたい場合に特に有効です。Google Cloud のドキュメントにも記載がありますが、利用は以下の方法で行います。- Storage Write API(バッチとしても利用可能で詳細はドキュメント参照)
- Dataflow
- Datastream
- BigQuery Connector for SAP
- Pub/Sub
- ストリーミング処理のメリット
- loadjob 制限への抵触がない
- リアルタイムデータの即時処理に有効
- ストリーミング処理のデメリット
- コストはバッチ処理より高くなる可能性がある(Storage Write API の利用で料金が発生するため)
- Cloud Run Functions で処理する場合、プロトコルバッファ、シリアル化処理を組み込む必要がある(バッファタイプ参照)
- ストリーミング処理の定義と特徴
- loadjob 制限とは何か
- 比較に出てきた loadjob 制限とは、BigQuery でのデータロードジョブに適用される一連の制約を指しています。
- よく引っかかる制限として以下のようなものがあります。
- 1 日のテーブルあたりの読み込みジョブ数:1,500 個のジョブ
- イベントドリブンで行う場合、1500ファイルがきたら制限に達してしまいます。
- 定期的な処理だと1時間あたり、62ジョブ程の loadjob 処理があると上限に達してしまいます(もしファイル単位で読み込む場合1時間に62ファイルしか読めません)。
- BigQuery 上でのバッチ処理とストリーミング処理の違い
- バッチ処理
- BigQuery のストレージ領域に直接書き込まれる
- 読み込んだデータの削除や変更が即時可能
- バッチ処理での対象データを BigQuery へ読み込む際に BigQuery 側の料金はかからない(BigQuery のストレージ料金は発生)
- ストリーミング処理
- BigQuery のストレージ領域に書き込まれず、まずはストリーミングバッファ領域に書き込まれ、その後一定時間後に、ストレージ領域に書き込まれる
- ストリーミングバッファのデータもクエリ可能だが、バッファのデータだった場合、データの削除や変更はストレージに書き込まれるまでできない
- BigQuery Storage Write API で BigQUery へデータ読み込む際に BigQuery 側の料金がかかる
- ストリーミング処理には Streaming inderts と BigQuery Storage Write API の2つがあり、Streaming inderts は以前のストリーミング API とドキュメント上は表記され、以前までの方法であり、新たに利用する場合は、Storage Write API のほうが低コストであり推奨されています。
- バッチ処理
- それぞれの内容の比較
- 利用するサービスも含めて内容を比較してみました。
項目 | Cloud Run Functions(バッチ) | Cloud Run Functions(ストリーミング) | BigQuery Data Transfer Service | Datastream |
---|---|---|---|---|
読み込み方法 | バッチ | ストリーミング | バッチ | ストリーミング |
実装内容(個人的な所感) | 実装するコード内容がシンプル | 実装するコード内容が少し煩雑(プロトコルバッファの定義やシリアル化など) | コーディング不要で容易 | コーディング不要で容易 |
スケジュール実行 | Cloud Scheduler で実装 | Cloud Scheduler で実装 | スケジュール機能で実装 | – |
イベントドリブン | 可能 | 可能 | こちらに従って対応可能。なお、AWS の場合、SQS の追加構築が必要 | – |
読み込みの自由度 | データの加工などスキーマの状態など必要に応じて定義可能 | データの加工などスキーマの状態など必要に応じて定義可能 | 読み込み時には柔軟に対応できない可能性もあるため、Cloud Storage の制限については事前に確認が必要 | テーブルの対象列などの絞り込みが可能 |
特徴 | 特に頻度などこだわりがなければまずはバッチでの読み込みを検討 | ニアリアルタイムでのデータ読み込みならストリーミング | なるべくマネージドで行い、データの転送の最小時間にも対応していれば良い | データベースをソースとしたニアリアルタイムレプリケーション、バックフィルの負荷などソースDBへの影響も検討が必要 |
適しているケース | ・データの加工など処理内容の自由度が高い ・実行頻度が loadjob の制限に抵触しなければまずはこれで良いと思う ・ある程度データをためて読み込む場合、読み込み処理に耐えうるスペックかなど検討がいる →処理が途中で止まって、データが途中まで入ることもあるため |
・データの加工など処理内容の自由度が高い ・読み込み頻度が多い場合、ストリーミング(今は Storage Write API)で読み込む |
・読み取りのソースが対応している場合は選択肢にはいる ・処理内容に自由度を求めず決まった処理で良い場合 ・バッチ処理のため loadjob 抵触が気になるので、連携頻度などにより loadjob 回数が多い場合は制限に引っかかる可能性あり |
・ソースがデータベースの場合 ・データベースの内容の加工が列や行の除外などで済む場合 ・ニアリアルタイムでデータ連携したい場合 |
考えておきたいこと | ・エラーになった際の読み込み方法 ・ステータス管理(Firestore など)を行い、取り込み失敗を把握する |
・エラーになった際の読み込み方法 ・ステータス管理(Firestore など)を行い、取り込み失敗を把握する |
・エラーになった際の読み込み方法 | ・データソースのカラムの変更などがあったときの対応 |
まとめ
- 記載した内容を考慮し、想定されるユースケースについて以下にまとめます。
- バッチ処理のユースケース
- loadjob 制限に引っかからない処理であれば、基本的にはバッチ処理を優先で考えます。
- 例えば、定期的に固定された loadjob 数に収まる読み込み内容(固定されたファイル数の処理や複数ファイルを1ファイルとして扱って処理する)であれば、バッチ処理でも良いと思います。
- さらに制限に抵触しないように以下のように(一例ですが)考慮できれば、バッチ処理でも短い時間間隔でデータ更新することも可能ですので、まずはバッチ処理で検討したほうが良いと思います(ただし、回避を意識するあまり、処理が煩雑になることもあるため注意)。
- 前処理などで複数ファイルを制限に抵触しないようなファイル数に纏める
- 制限に抵触しないようにテーブルを分けて処理する
- loadjob 制限に引っかからない処理であれば、基本的にはバッチ処理を優先で考えます。
- ストリーミング処理のユースケース
- ニアリアルタイムでのデータ読み込み
- loadjob 制限に抵触する恐れがある処理はストリーミング処理にする
- バッチ処理のユースケース
- バッチ処理で無理にでも loadjob 回避したいような記載をしてしまいましたが、煩雑にならない範囲で回避方法を試した結果、loadjob の制限に少しでも抵触するような可能性があれば、素直にストリーミング処理を検討したほうが、運用面で疲弊はしないと思いました。