記事について
最近Databricks案件が増えているので、
改めて学習している中で明確に知らない知識を整理してみました。
検定試験などで使える知識もあると思うので、
興味のある方はご活用ください。
記事概要
本文では以下の点について触れます
💻 開発環境とインターフェース
- ライブラリ管理
- %pip マジックコマンドを使用して、ノートブック単位のスコープでインストール。
- ノートブックの識別
- 最初の行に記述される特殊なヘッダー:# Databricks notebook source。
- ストレージの違い
- DBFS ルート: ワークスペース専用の一時的な保存場所。
- 外部マウント(Mount): ユーザーが管理する S3/ADLS などの外部ストレージ。
- 対話型ツール(ウィジェット)
- text, dropdown, combobox, multiselect の4種類でパラメーターを動的に受け取り。
- ビューの作成
- エイリアス(AS)を設定することで、元のデータ構造を維持しつつ、利用者向けの分かりやすい命名を提供。
- セキュリティ
- Secrets(秘密情報)の取り扱い:反復処理などで不用意に露出させない運用。
⚡ パフォーマンスとアーキテクチャ
- Spark UIの活用
- 「Stages」や「Storage」タブからディスクへの書き込み状況やキャッシュ状態を確認。
- 処理の効率化
- %sh(単一ノード)の限界を理解し、大規模データはSparkによる分散処理へ移行する。
- 最適化技術:述語プッシュダウン(Predicate Pushdown)
- 物理プランの段階でフィルタリングを適用し、読み込むデータ量を最小化。
- 結合の高速化:broadcast()
- 小さいテーブルを全ノードに配布することで、コストの高い「シャッフル」を回避。
- スキュー(歪み)対策
- 特定パーティションへのデータ偏り(Data Skew)を特定し、処理のボトルネックを解消。
- クラスタータイプ:High Concurrency(高並列)クラスター
- 複数ユーザーによる同時クエリ実行に特化し、リソースの分離と最適化を行う。
🔄 ストリーミングとCDC(変更データ捕捉)
- Structured Streaming
- 無限に増え続けるテーブル(Unbounded Table)としてデータを扱うプログラミングモデル。
- 状態管理(checkpointLocation)
- クエリの進行状況を記録。スキーマ変更時には新しいパスを指定する必要がある。
- ストリーム間結合
- タイムスタンプ等をキーに、一致する相方が届くまで状態(State)を保持・追跡。
- 変更データフィード(Change Data Feed: CDF)
- 挿入・更新・削除の「差分」を抽出。Time Travel(断面の復元)との使い分け。
- CDCの自動処理
- 外部システムからの変更ログを検知し、APPLY CHANGES INTO 等でDeltaテーブルへ反映。
📊ジョブ管理とガバナンス
- 実行履歴の保持
- ジョブの実行結果は、デフォルトで 60日間 または 5,000件 まで保持。
- API活用:Jobs API
- GET /api/2.1/jobs/get を使用して、マルチタスクジョブの詳細や構成を確認。
- データ保護(PIIデータ対策)
- パーティション分割とACL(アクセス制御)を組み合わせ、物理的な境界を利用した削除・保護の最適化。
- データ品質管理
- パイプライン外の管理用テーブルで品質ルールを一括保持し、動的なバリデーションを実現。
- Delta Live Tables (DLT)
- パイプラインの自動化と pipelines.reset.allowed プロパティによる再計算の制御。
以下本文
最適化された書き込みとは
- Databricksの「最適化された書き込み(Optimized Writes)」は、Delta Lakeテーブルへのデータ書き込み時に、多数の小さなファイルが生成される「小ファイル問題(Small File Problem)」を自動的に回避し、最適なファイルサイズ(通常128MB)に統合して書き込む機能です
- 多数の小さなファイルにどのような問題がある?
- メタデータ管理のオーバーヘッド
- ファイルシステム(S3やAzure Data Lakeなど)は、ファイルを開くたびに「どこにデータがあるか」を確認するメタデータへのアクセスが発生します。
- 1GBのファイルが1枚: メタデータの読み取りは1回。
- 1KBのファイルが100万枚: メタデータの読み取りが100万回発生し、それだけで数分かかることもあります。
- I/O(入出力)の非効率
- ディスクやネットワークからの読み込みには、準備運動(シークタイムや接続確立)が必要です。小さなファイルばかりだと、「データの読み込み時間」よりも「ファイルを開閉する時間」の方が長くなってしまいます。
- 並列処理の限界
- メタデータ管理のオーバーヘッド
ノートブック レベルのスコープを持つ Python パッケージをインストールする方法
- Databricksにおいて、特定のノートブック内でのみ有効なPythonライブラリをインストールするには、%pip マジックコマンドを使用するのが標準的かつ最も推奨される方法です。
デフォルトのデータ保持閾値の時間は何日?
- 7日間
- Delta Lakeで古いファイル(削除済みまたは更新前のデータ)をクリーンアップする VACUUM コマンドに関連するデフォルトの保持期間は 7日間 です。
パーティションがディスクに書き込まれていることを示す主な指標を提供する Spark UI の 2 つの場所はどこ?
- Stages タブ(および Stage 詳細ページ)
- 特定のステージを選択すると表示される「Summary Metrics」や「Tasks」のテーブルで、以下の項目を確認できます。
- Storage タブ
- キャッシュ(.persist() や .cache())されたデータセットの状態を確認する場所です。
- Storage Level: ここで DISK_ONLY や MEMORY_AND_DISK と表示されている場合、データがディスクに保持されていることを示します。
- Size on Disk: キャッシュされたパーティションのうち、実際にディスクに書き込まれた容量を確認できます。
以下のコマンドで、特定のパーティションのディスク書き込みが問題でないか確認できます
SELECT
partition_column_name,
COUNT(*) as row_count,
SUM(size_in_bytes) / 1024 / 1024 as size_mb -- ファイルサイズを取得できるメタデータがある場合
FROM
table_name
GROUP BY
1
ORDER BY
row_count DESC
Databricks Python ノートブックの最初の行は何になりますか?
# Databricks notebook source
- この行は、Databricksがノートブックをソースファイル(.pyファイルなど)としてエクスポートしたり、外部のリポジトリ(GitHubなど)と同期したりする際に、 「このファイルはDatabricksのノートブック形式である」 ことを識別するための特殊なヘッダー(マジックコメント)です。
- DBFS ルート ストレージと、dbutils.fs.mount() を使用してマウントされた外部オブジェクト ストレージの違いは?
- Databricksにおいて、DBFS(Databricks File System)ルートと 外部マウント(dbutils.fs.mount()) は、どちらも dbfs:/ 形式のパスでアクセスできますが、その実体と運用面には大きな違いがあります。
| 項目 | DBFS ルート ストレージ | 外部オブジェクト ストレージ (Mount) |
|---|---|---|
| 実体 | Databricks ワークスペース作成時に自動生成される専用の S3/ADLS バケット。 | ユーザーが自身で管理する既存のクラウドストレージ(S3, ADLS, GCS)。 |
| 主な用途 | ライブラリの保存、初期サンプルデータ、一時的なログ、Hive メタストアのデフォルト場所。 | 本番データ、大規模なデータレイク、共有アセット。 |
| セキュリティ | ワークスペースの全ユーザーが読み書き可能(詳細な権限管理が困難)。 | クラウド側のIAMロールやサービスプリンシパルで詳細なアクセス制御が可能。 |
| データの永続性 | ワークスペースを削除すると、このストレージも原則削除される。 | ワークスペースを削除しても、ストレージ内のデータはそのまま残る。 |
| 推奨度 | 低(機密データの保存は非推奨)。 | 高(データガバナンスの観点から推奨)。 |
%sh使うと約 1 GB のデータを抽出してロードするのに 20 分以上かかる可能性ある?どうすれば対応できる?
- なぜ %sh は遅いのか?(20分以上かかる理由)
- 単一ノードの限界
- %sh はドライバーノード(親玉のPC1台)だけで動作します。Databricks の最大の強みである「複数台の分散処理(Spark)」を一切使っていません。
- メモリ不足とディスクI/O
- 1GBのデータをドライバのローカルメモリやディスクで処理しようとすると、リソースが枯渇し、スワップ(処理の停滞)が発生します。
- ネットワークのボトルネック
- クラウドストレージ(S3/ADLS)との通信を、最適化されていない標準的なシェル経由で行うため、並列転送ができず非常に時間がかかります。
- 単一ノードの限界
- 推奨される対応策:20分かかる処理を 数秒〜数分 に短縮するためのステップです。
- %pip でライブラリを入れ、Spark/Pythonで書く
- もし %sh で curl や wget を使ってデータを取得しているなら、Pythonのライブラリや Spark を直接使いましょう。
- B. dbutils.fs (cp/mv) を活用する
DBFSやマウント済みストレージ間でデータを移動させるなら、シェルコマンドの cp ではなく、Databricks専用のユーティリティを使います。これはバックグラウンドで最適化されています。
Spark Structured Streaming で使用される一般的なプログラミング モデルの特徴は?
- 無限に増え続けるテーブル (Unbounded Table)
- Structured Streamingでは、ストリーミングデータを「静的なテーブル」として扱います。
- 新しいデータが到着するたびに、そのデータは テーブルの新しい行として末尾に追加(Append) されていくという考え方
- インクリメンタル(増分)実行
Sparkは、入力テーブルに追加された新しいデータを自動的に検出し、前回の実行からの差分(増分)だけを処理します。 - 出力モード (Output Modes)処理結果を外部(ストレージやコンソール)に書き出す際、以下の3つのモードから選択できます。
- モード特徴
– Append (追加)新しく追加された行のみを出力する
– (最も一般的)。
– Complete (完全)毎回、結果テーブル全体を書き出す
– (集計処理などで使用)。
– Update (更新)
– 前回から変更があった行のみを更新・出力する。
- モード特徴
- 効率性
- 毎回データ全体を再計算するのではなく、新しく届いたデータのみを既存の結果に結合したり集計したりするため、低レイテンシで動作します。
実行例
Structured Streamingを使用するには、spark.readStream から始まり、最後に writeStream で出力モードを指定するのが基本の流れです。
# 1. 無限に増え続けるテーブルとして読み込み
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/mnt/data/input_logs")
# 2. インクリメンタルな変換処理
# (ここでは新しい行だけが自動的に処理対象になる)
processed_df = df.filter("action = 'click'").groupBy("user_id").count()
# 3. 出力モードの指定
query = processed_df.writeStream \
.outputMode("complete") \
.format("delta") \
.option("checkpointLocation", "/mnt/data/checkpoints") \
.start("/mnt/data/output_table")
そうじゃないバッチ処理コードの例
# readStream ではなく read を使う
df = spark.read \
.format("delta") \
.load("/mnt/data/user_logs") # 実行した瞬間に存在するデータだけを読み込む
# 処理を実行
result = df.filter("event = 'jump'")
# writeStream ではなく write を使う
# 1回書き込んで終了(新しいデータが来ても自動では動かない)
result.write \
.mode("overwrite") \
.save("/mnt/data/processed_results")
マウントされたオブジェクト ストレージに Python Wheel をアップロードするには、どの Databricks CLI コマンドを使用する必要がありますか?
- マウントされたオブジェクトストレージ(DBFS上のマウントポイント)に Python Wheel ファイル(.whl)をアップロードするには、
- Databricks CLI の databricks fs cp コマンドを使用します。
ジョブ実行履歴の保持期間とは何ですか?
- Databricks における ジョブ実行履歴の保持期間(Job Run Retention) とは、実行されたジョブの結果、ログ、および詳細な統計情報がシステム内に保存され、UIやAPIから確認できる期間のことです。
この期間を過ぎると、過去の実行データは自動的に削除されます。
デフォルトの保持期間
- Databricks の標準設定では、ジョブの実行履歴は以下の条件で保持されます。
- 期間: 60日間
- 件数制限: 同一ジョブにつき最大 5,000件 まで
PIIデータはトピック フィールドごとにパーティション分割し、ACL と削除ステートメントでパーティション境界を活用できるようにする必要がある?
- はい、その設計方針は非常に理にかなっており、Databricksやデータレイクの運用において「ベストプラクティス」の一つと言えます。
- 主な理由は以下の3点です。
- 削除ステートメント(GDPR/改正個人情報保護法対応)の最適化
- ACL(アクセス制御)の簡素化
- パーティション・プルーニングによる性能向上
テーブルの合計サイズが 10 TB を超えているにもかかわらず、ほとんどのファイルは 64 MB 未満である理由はどんなものがある?
- テーブルの合計サイズが 10 TB もあるのに、個々のファイルが 64 MB 未満(小さなファイルが大量にある状態)である場合、データプラットフォームのパフォーマンスを著しく低下させる「Small File Problem(小さなファイル問題)」が発生しています。
- 過剰なパーティション化 (Over-partitioning)
最も一般的な原因です。高カーディナリティ(値の種類が非常に多い)カラムでパーティションを分割すると、データが細切れになります。
- 例
- 例えば user_id や timestamp(秒単位)などでパーティションを分けると、1つのパーティション(フォルダ)あたりのデータ量が極端に少なくなります。
変更フィード機能のメリットとよく使われるケースは?
- Databricks(Delta Lake)の 変更データフィード(Change Data Feed: CDF) は、テーブルに対して行われた「挿入・更新・削除」の履歴を、変更前の値と後の値を含めて正確に追跡できる機能です。
単に最新のデータを取得するだけでなく、「何がどう変わったか」という変化のプロセスを活用できるのが最大の強みです。 - 変更データフィード(CDF)の主なメリット
- 増分処理の効率化 : テーブル全体をスキャンし直す必要がなく、前回の処理以降の「変更分」だけをピンポイントで取得できるため、コンピュートコストを大幅に削減できます。- 変更種別の判別 : そのデータが「新規追加(Insert)」なのか「既存の更新(Update)」なのか、あるいは「削除(Delete)」されたのかを、システム列(_change_type)によって一目で判別できます。
- ビフォー・アフターの把握 : 更新(Update)の場合、更新前の値(update_preimage)と更新後の値(update_postimage)を同時に取得できるため、数値の差分計算などが容易になります。
- よく使われるケース(ユースケース)
- メダリオン・アーキテクチャでの下流への伝播
Bronze(生データ)から Silver(クレンジング済み)、Gold(集計済み)へとデータを流す際、Silverテーブルで発生した「古いデータの削除」や「過去データの修正」を下流のGoldテーブルに反映させるために使用されます。 - マテリアライズド・ビューの更新
複雑な集計(合計値や平均値など)を行っているテーブルにおいて、一部のデータが変わった際に、その差分だけを使って集計結果を「インクリメンタルに更新」する場合に非常に有効です。 - 外部システムへの同期(CDC)
Databricks内のデータが更新されたことをトリガーに、外部の検索エンジン(Elasticsearch等)や基幹システム、通知サービスに「どのレコードがどう変わったか」を正確に通知・同期する際に利用されます。 - 監査とコンプライアンス
「誰が、いつ、どの値を、何から何へ書き換えたか」という詳細な監査ログとして活用できます。特にPII(個人情報)の変更履歴を追跡する必要がある場合に強力な武器となります。
- メダリオン・アーキテクチャでの下流への伝播
変更データフィードの有効化の仕方
-- テーブル作成時に有効化 CREATE TABLE student_scores (id INT, name STRING, score INT) TBLPROPERTIES (delta.enableChangeDataFeed = true); -- 既存のテーブルで有効化 ALTER TABLE student_scores SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
2. 変更履歴を読み取る方法
CDFが有効になると、通常のデータに加えて以下の4つのメタデータ列が記録されます。
- _change_type: 変更の種類
- insert
- update_preimage
- update_postimage
- delete
- _commit_version
- 変更時のバージョン番号
- _commit_timestamp
- 変更時のタイムスタンプ
-- バージョン0から最新までの変更履歴を取得
SELECT * FROM table_changes('student_scores', 0);
-- 特定の期間の変更を取得
SELECT * FROM table_changes('student_scores', '2026-03-01 00:00:00');
table_changes はCDFが有効なテーブルに対してのみ使える特別な関数です。
範囲指定のパターン
- バージョン指定
- table_changes(‘my_table’, 10, 15)
- バージョン10から15までの変更。
- table_changes(‘my_table’, 10, 15)
- 最新まで
- table_changes(‘my_table’, 10)
- バージョン10から現在までの全変更。
- table_changes(‘my_table’, 10)
- 時間指定
- table_changes(‘my_table’, ‘2026-03-01 10:00:00’)
- その時刻以降の変更。
- table_changes(‘my_table’, ‘2026-03-01 10:00:00’)
Databricksウィジェットとは?どんなものがある?
- Databricksウィジェットは、ノートブックをインタラクティブなダッシュボードに変えるための非常に便利なツールです。
一言で言えば、「コードを直接書き換えずに、画面上の入力フォームから変数(パラメーター)を渡す仕組み」のことです。これを使うことで、エンジニア以外の人でも値を変更して分析結果を再計算させることが可能になります。 - ウィジェットでできること
- 動的なフィルタリング
- 日付や地域を選択してグラフを更新する。
- 変数の共通化
- パイプライン全体で使う設定値(ファイルパスなど)を一箇所で管理する。
- ジョブの引数受け取り
- 外部のワークフロー(Azure Data Factoryなど)からパラメーターを受け取って実行する。
- 動的なフィルタリング
ウィジェットの種類(全4種)
| ウィジェット名 | 特徴 | 適した用途 |
|---|---|---|
| テキスト (text) | 自由な文字列を入力できるボックス。 | 名前、ID、特定のキーワード検索など。 |
| ドロップダウン (dropdown) | あらかじめ設定したリストから1つを選択。 | 都道府県、部署名、ステータス(ON/OFF)など。 |
| コンボボックス (combobox) | リストから選ぶか、自由入力も可能。 | 候補はあるが、例外的な値も入れたい場合。 |
| マルチ選択 (multiselect) | リストから複数の値をチェックボックス形式で選択。 | 複数地域の同時分析、特定カテゴリの絞り込み。 |
ウィジェットの基本的な使い方(Python例)
ウィジェットを作成・操作するには、dbutils.widgets を使用します。
ウィジェットの作成
# ドロップダウンを作成する例
dbutils.widgets.dropdown("region", "Tokyo", ["Tokyo", "Osaka", "Nagoya"], "地域を選択してください")
# テキストボックスを作成する例
dbutils.widgets.text("user_id", "001", "ユーザーIDを入力")
値の取得
作成したウィジェットに入力された値をコード内で使うには get メソッドを使います。
selected_region = dbutils.widgets.get("region")
print(f"選択された地域は: {selected_region}")
ウィジェットの削除
dbutils.widgets.remove("region") # 特定のものを削除
dbutils.widgets.removeAll() # 全て削除
新しいテーブルから選択したフィールドにエイリアスを設定することで、元のデータスキーマとテーブル名を維持するビューを作成とは?
一言でいうと、 「物理的なデータ構造(元テーブル)は変えずに、利用者に見せる窓口(ビュー)だけ、項目名を分かりやすくしたり、特定の列だけに絞ったりして提供する」 ということを指します。
具体的にどういうことか解説します。
元々のテーブルにあるカラム名(列名)がシステム的な名前(例: cust_id_01)だった場合、ビューを作成する際に人間が理解しやすい名前(例: customer_id)に変換するケースです。
SQLのイメージ:
CREATE VIEW sales_v AS SELECT cust_id_01 AS customer_id, -- エイリアスを設定 sls_amt AS sales_amount -- エイリアスを設定 FROM raw_data.sales_table; -- 元のデータスキーマとテーブル名
- 「元のデータスキーマとテーブル名を維持する」の意味
これは、 「元のデータ(ソース)を一切いじらない」 という点が重要です。- データの実体: 元のスキーマ(例: raw_data)にある元のテーブル(例: sales_table)の中に、元のカラム名のまま残り続けます。
- ビューの役割: ビューはあくまで「定義」なので、元のデータをコピーすることなく、参照する際の見え方だけを変えています。
- なぜこれを行うのか(メリット)
- ビジネスフレンドリーな命名
- エンジニアが作った複雑な物理名のテーブルを、アナリストが使いやすい「日本語名」や「標準的な英語名」に変えて提供できます。
- インパクトの最小化
- 物理テーブルのカラム名がシステムの都合で変わっても、ビュー側でエイリアスを調整すれば、そのビューを使っているダッシュボード(TableauやPower BIなど)を修正せずに済みます。
- 不要な列の隠蔽
- 元テーブルには100列あっても、ビューで特定の10列だけにエイリアスをつけて選択(SELECT)すれば、利用者は迷わずに済みます。
- 互換性の維持
- 古いシステムで使っていたカラム名をエイリアスとして維持することで、移行期間中のツールを壊さずに新しいテーブル構造へ移行できます。
- ビジネスフレンドリーな命名
Delta Lake は、クエリ フィルターに基づいてデータのスキップに活用される各テーブルの最初の何列の統計を自動的に収集する?
Delta Lakeがデータスキップ(Data Skipping)のために自動的に統計情報を収集するのは、テーブルの最初の32列です。
32列の統計収集の仕組み
Delta Lakeは、テーブルにデータを書き込む際、各データファイル(Parquetファイル)に含まれる値の最小値 (minimum)、最大値 (maximum)、およびNull値の数を記録します。
SQLのDESCRIBE EXTENDEDコマンドとは
Databricks(およびApache Spark SQL)におけるDESCRIBE EXTENDEDコマンドは、テーブルの基本構造(カラム名やデータ型)に加えて、より詳細なメタデータ(格納場所やテーブル形式など)を表示するためのコマンドです。
通常のDESCRIBE(またはDESC)ではカラム情報しか表示されませんが、EXTENDEDを付けることで「このテーブルがどこにあるのか?」「外部テーブルか管理テーブルか?」といった運用に不可欠な情報を一気に確認できます。
取得できる主な情報
このコマンドを実行すると、通常のカラムリストの下に 「Detailed Table Information」 というセクションが表示され、以下の項目が確認できます。
| 項目名 | 内容 |
|---|---|
| Database | テーブルが所属するデータベース(スキーマ)名。 |
| Table | テーブル名。 |
| Owner | テーブルの所有者(作成者)。 |
| Created Time | 作成日時。 |
| Last Access | 最終アクセス日時。 |
| Created By | 作成したSparkのバージョンなど。 |
| Type | MANAGED(管理テーブル)か EXTERNAL(外部テーブル)か。 |
| Location | 実際のデータファイルが保存されているクラウドストレージ(S3/ADLS/GCS)のパス。 |
| Provider | データフォーマット(delta, parquet, csv など)。 |
| Table Properties | delta.minReaderVersion や delta.dataSkippingNumIndexedCols などのカスタム設定。 |
保存された秘密を反復処理して各文字を出力するの反復処理とは?
具体的なイメージ(Python例)
例えば、秘密のパスワードが TopSecret だったとします。これを「反復処理して各文字を出力する」コードは以下のようになります。
# 保存された秘密
secret = "TopSecret"
# 反復処理(forループ)
for char in secret:
print(char)
構造化ストリーミングでは、なぜクエリに変更を加える際(新しいフィールドの追加やスキーマの変更など)は、新しいcheckpointLocationを指定する必要がある?
構造化ストリーミング(Structured Streaming)において、チェックポイント(checkpointLocation)は、 「クエリの家計簿(状態管理)」 のような役割を果たしています。
クエリに破壊的な変更(新しい列の追加や集計ロジックの変更)を加えた際に、なぜ新しいパスを指定する必要があるのか、その理由は 「不整合(矛盾)」を防ぐため です。
- チェックポイントの中身
チェックポイントディレクトリには、単なるデータの進行状況(オフセット)だけでなく、以下の重要な情報が保存されています。
- Offset: どこまでデータを読み込んだか。
- Commit: どのデータの書き込みが完了したか。
- Schema: 実行時のデータの構造。
- State: 集計(Window関数やGroup By)の途中の計算結果。
なぜ同じ場所を使い回せないのか
もしスキーマを変更したのに、古いチェックポイントを使い回そうとすると、Sparkは以下のようなパニック(エラー)を起こします。
- スキーマのミスマッチ
- 「保存されている家計簿には『名前』と『金額』しかないのに、新しいコードには『税率』という列がある。どう計算を再開していいか分からない!」となります。
- 状態(State)の非互換性
- 前回の集計結果(State)が「2列構成」で保存されている場合、新しい「3列構成」のコードでその続きを計算しようとすると、バイナリレベルでデータの不整合が発生します。
チェックポイントを新しくするタイミング
以下のような変更を行う場合は、基本的に 新しい checkpointLocation を指定する(または既存のフォルダを削除する) 必要があります。
| 変更内容 | 理由 |
|---|---|
| 列の追加・削除 | 保存されているスキーマ情報と一致しなくなるため。 |
| 集計キーの変更 | 内部で保持している「状態(State)」の構造が変わるため。 |
| UDF(ユーザ定義関数)の変更 | ロジックが変わると、中間結果との整合性が取れなくなるため。 |
| 出力モードの変更 | append から complete への変更などは管理方法が異なるため。 |
テーブルスキーマを更新するには、追加するフィールドごとにデフォルト値を指定しない場合どうなる?
- 既存の行には「NULL」が入る
ストリーミングデータフレームと静的テーブルの結合コマンド例は?
- 静的テーブルが右側にくるように結合するのが一般的です
Databricks High Concurrency クラスターとは?
Databricksの High Concurrency(高並列処理)クラスターとは、複数のユーザーが同時にリソースを共有してクエリを実行することに特化したクラスタータイプです。
主に、データサイエンティストやアナリストがチームで1つのクラスターを使い、SQL、Python、Rなどでインタラクティブに分析を行う「共有環境」として設計されています。
- 主な特徴とメリット
High Concurrencyクラスターが「標準(Standard)」クラスターと異なる点は、以下の通りです。
| 特徴 | 内容 |
|---|---|
| リソースの最適化 | クエリごとにリソースを細かく割り当て、1人の重いクエリがクラスター全体を占有するのを防ぎます。 |
| ユーザー分離 (Isolation) | 複数のユーザーが同じクラスターで作業しても、お互いの変数や関数が干渉しないようプロセスが分離されています。 |
| オートスケーリング | 実行待ちのクエリが増えると自動的にワーカーノードを追加し、負荷が減ると削除してコストを抑えます。 |
| 多言語サポート | SQL、Python、R をサポートしていますが、Scala はアーキテクチャ上の制限によりサポートされていません。 |
High Concurrency と Standard の比較
用途に合わせてどちらを選ぶべきか、以下のテーブルで比較できます。
| 項目 | High Concurrency (高並列) | Standard (標準) |
|---|---|---|
| 主な用途 | 複数人でのインタラクティブな分析、BIツール接続 | 1人での開発、複雑なETLジョブの実行 |
| 同時実行性 | 非常に高い(多数の小さなクエリに最適) | 低い(1つの大きな処理にリソースを集中) |
| サポート言語 | SQL, Python, R | SQL, Python, R, Scala |
| セキュリティ | テーブルレベルのアクセス制御が可能 | ユーザー間での完全な分離は限定的 |
マルチタスク ジョブでタスクとして実行するように構成されたノートブックを確認するには、どんな REST API 呼び出しが必要?
マルチタスク ジョブにおいて、どのノートブックがどのタスクに割り当てられているかを確認するには、Jobs API の
GET /api/2.1/jobs/get
を使用します。
このエンドポイントを呼び出すことで、ジョブ内の全タスクの依存関係や、それぞれのタスクが参照しているノートブックのパスを詳細に取得できます。
使用する API エンドポイント
| 項目 | 内容 |
|---|---|
| エンドポイント | GET /api/2.1/jobs/get |
| 必須パラメータ | job_id (確認対象のジョブID) |
| 取得できる主な情報 | 各タスクの実行順序、ノートブックのパス、引数、クラスター設定など。 |
チェックポイント ディレクトリは、結合に存在する一意のキーの状態情報を追跡するために使用されるとはどういう意味?
構造化ストリーミング(Structured Streaming)における ストリーム間結合(Stream-Stream Join) では、チェックポイントディレクトリが非常に重要な役割を果たします。
「一意のキーの状態情報を追跡する」とは、簡単に言うと 「まだ相方が現れていないデータを、後で結合するために一時保管しておく」 仕組みのことです。
なぜ「状態(State)」の追跡が必要なのか?
静的なテーブル同士の結合(Batch Join)とは異なり、ストリーミングではデータがバラバラのタイミングで届きます。
- 例: 「広告の表示ログ」と「クリックログ」を結合する場合
- 課題
- 表示ログが届いた瞬間には、まだクリックが発生していない(クリックログが届いていない)可能性があります。
- 解決策
- 表示ログを 「状態(State)」 としてチェックポイントに保存しておき、後でクリックログが届いたときに「あ、これはさっき届いた表示ログと一致するな」と照合できるようにします。
- 課題
Delta Lake の変更データ フィードとTime Travel 機能の違いとは
Databricksにおける 「変更データ フィード (Change Data Feed: CDF)」 と 「Time Travel (タイムトラベル)」 は、どちらもデータの履歴に関わる機能ですが、その目的と仕組みは大きく異なります。
一言でいうと、 Time Travelは「過去の断面を見る」 ためのもので、 CDFは「何が変わったかの差分を取り出す」 ためのものです。
主な違いの比較
| 項目 | Time Travel (タイムトラベル) | 変更データ フィード (CDF) |
|---|---|---|
| 主な目的 | 過去の特定の時点のデータを再現・復旧する。 | 変更された行(挿入・更新・削除)のみを抽出して後続へ流す。 |
| 出力内容 | その時点の「テーブル全体」の状態。 | 変更された「差分」と、その変更の種類(Insert/Update/Delete)。 |
| 更新の追跡 | 更新前の値は分かるが、行単位で「何が起きたか」の判別は困難。 | 更新前の値(pre-image)と更新後の値(post-image)を明確に区別して保持。 |
| 有効化設定 | デフォルトで有効(Deltaテーブルの標準機能)。 | 明示的に有効化が必要 (delta.enableChangeDataFeed = true)。 |
pipelines.reset.allowed propertyとは
pipelines.reset.allowed は、Databricksの Delta Live Tables (DLT) において、テーブルの再計算(リセット)を許可するかどうかを制御するためのプロパティです。
通常、DLTパイプラインで「リセット」を実行すると、既存のデータが削除され、ソースからすべてのデータが再読み込み・再処理されます。このプロパティを false に設定することで、意図しないデータの消失や、コストのかかる全件再処理を防ぐことができます。
主な役割と動作
| 設定値 | 動作 |
|---|---|
true (デフォルト) |
パイプライン設定から「リセット」を実行可能。既存のテーブルデータは削除され、一から作り直されます。 |
false |
リセット操作が禁止されます。リセットしようとするとエラーが発生し、データは保護されます。 |
なぜこのプロパティが必要なのか?
大規模なデータ基盤や本番環境では、以下のリスクを回避するために false に設定することが推奨されます。
高額な再計算コスト: 数テラバイト、数ペタバイトあるテーブルを誤ってリセットすると、再処理に膨大なコンピューティングコストと時間がかかります。
外部システムからの CDC データを自動的に処理とは
外部システムからの CDC (Change Data Feed / Change Data Capture) データの自動処理とは、データベース(SQL Server, Oracle, MySQLなど)で発生した「挿入・更新・削除」の履歴をリアルタイムに検知し、Databricks上のDelta Lakeへ自動的に反映させる仕組みを指します。
Databricksでは、主に Delta Live Tables (DLT) の
APPLY CHANGES INTOという機能を使って、この複雑な処理を簡潔に実装します。
処理の全体像(フロー)
外部システムからデータが届き、Deltaテーブルに反映されるまでの標準的なステップは以下の通りです。
| ステップ | 内容 |
|---|---|
| 1. 検知 (Capture) | 外部DBのログから「どの行がどう変わったか」を抽出する(DebeziumやFivetranなどを使用)。 |
| 2. 取り込み (Ingestion) | 変更ログをメッセージキュー(Kafka, Azure Event Hubsなど)経由でDatabricksにストリーミング。 |
| 3. 変換 (Transform) | 届いたログ(Insert/Update/Deleteのフラグ付き)を解析。 |
| 4. 反映 (Apply) | APPLY CHANGES INTO を使い、ターゲットテーブルに最新状態を書き込む(マージ)。 |
なぜ「自動処理」が必要なのか?(課題と解決)
手動でSQLの MERGE 文を書くことも可能ですが、CDCデータの処理には以下のような特有の難しさがあります。
| 課題 | 自動処理(DLTの APPLY CHANGES)による解決 |
|---|---|
| 順序の制御 | 同じ行に対する「更新→削除」が逆転して届いても、シーケンス番号やタイムスタンプを見て正しく処理します。 |
| 削除の扱い | ソース側で削除されたレコードを、ターゲット側でも自動的に物理削除または論理削除します。 |
| スキーマ進化 | ソース側に新しい列が増えた際、自動的にターゲットテーブルの定義も拡張できます。 |
| 計算の重複排除 | 短時間に同じIDの更新が複数回届いた場合、最新の状態だけを適用して効率化します。 |
述語プッシュダウンと物理プランの関係性
述語プッシュダウン(Predicate Pushdown)と物理プラン(Physical Plan) の関係は、一言でいえば「クエリの実行効率を最大化するために、データ読み込みの初期段階でフィルタリングを組み込む最適化プロセス」のことです。
Sparkのクエリ実行エンジン(Catalyst Optimizer)が、私たちが書いたSQLやDataFrameの処理をどのように解釈し、物理的な動作に変換するかという文脈で非常に重要になります。
述語プッシュダウンとは?
「述語(Predicate)」とは、SQLの WHERE 句や filter() 条件(例:age > 20)のことです。
通常、データをすべてメモリに読み込んでからフィルタリングするのではなく、「データソース(ストレージ)側」で可能な限り先に絞り込むことを「プッシュダウン(押し下げる)」と呼びます。
物理プランとの関係性
Sparkがクエリを実行する際、処理は「論理プラン」から「物理プラン」へと変換されます。述語プッシュダウンはこの過程で以下のように反映されます。
| フェーズ | 述語プッシュダウンの動き |
|---|---|
| 論理プラン (Logical Plan) | ユーザーが書いた通りに「データを読む」→「フィルターする」という順序で構成されます。 |
| 最適化プラン (Optimized Plan) | カタリスト最適化により、フィルター条件が「データを読む」処理のすぐ上、あるいは中へと移動します。 |
| 物理プラン (Physical Plan) | ここが核心です。 実際にファイル(ParquetやDelta)を読み取る FileScan 操作の中に、フィルター条件が埋め込まれます。 |
pyspark.sql.functions.broadcast とは
pyspark.sql.functions.broadcast は、Sparkの結合(Join)処理を劇的に高速化するための関数です。
一言でいうと、 「小さい方のテーブルを全ワーカーノードに丸ごとコピー(放送)することで、ネットワーク経由の重いデータ移動(シャッフル)を回避する」 仕組みです。
- 動作の仕組み:Broadcast Hash Join
通常の結合(Sort Merge Join)では、結合キーに基づいて両方のテーブルのデータをネットワーク越しに並べ替える「シャッフル」が発生します。これがボトルネックになります。
broadcast を使用すると以下の流れに変わります。
- 収集: 小さいテーブルをドライバーノードに一度集める。
- 配布: そのデータを全ワーカーノード(Executor)にコピーして配信する。
- ローカル結合: 各ワーカーが、手元にある大きなテーブルの断片と、配布された小さいテーブルをその場で結合する。
メリットとデメリット
| 項目 | メリット | デメリット・リスク |
|---|---|---|
| パフォーマンス | 大規模なデータ移動(シャッフル)がなくなるため、非常に高速。 | メモリ不足(OOM)のリスク。大きすぎるテーブルを放送するとExecutorがクラッシュします。 |
| リソース | ネットワーク帯域の消費を抑えられる。 | ドライバーノードと全Executorのメモリを消費する。 |
- 基本的な使い方
from pyspark.sql.functions import broadcast # large_df は数億行、small_df は数千行と想定 # small_df を全ノードに配布して結合する result_df = large_df.join(broadcast(small_df), "id")
スパーク パーティションのサブセットに割り当てられたデータが増えることで発生するスキューとは
Sparkにおける スキュー(Data Skew / データ歪み) とは、特定のパーティションにデータが極端に偏ってしまい、一部のワーカーノードだけが過負荷になる現象を指します。
分散処理の理想は「全ノードが均等に仕事を終えること」ですが、スキューが発生すると、 「ほとんどのノードは暇なのに、一つのノードだけが延々と処理を続けている」 という状態になり、全体の処理時間がその遅いノードに引きずられてしまいます。
スキューが発生する仕組み
Sparkはデータを「パーティション」という単位に分割して並列処理します。通常、ハッシュ関数などを用いて均等に分配しようとしますが、特定のキーにデータが集中していると偏りが発生します。
| 状態 | データの分配 | 処理の様子 |
|---|---|---|
| 正常(均等) | 全てのパーティションがほぼ同じサイズ(例:各100MB)。 | 全ノードが同時に処理を終え、効率が最大化される。 |
| スキュー発生 | 特定のパーティションだけ巨大(例:1つだけ10GB、他は10MB)。 | 巨大なパーティションを担当するワーカーが終わるまで、全体のジョブが終わらない。 |
なぜスキューが起きるのか?(主な原因)
| 原因 | 内容 |
|---|---|
| 特定のキーの集中 | 結合(Join)や集計(GroupBy)の際、特定のID(例:NULL や default、超大手顧客のID)にデータが数億件集中している。 |
| 不適切なパーティション設計 | データのカーディナリティ(値の種類)が低い列をパーティションキーに選んでしまった。 |
| データの特性 | そもそも現実のデータが「特定の日にだけ集中している」といった偏りを持っている。 |
スキューが引き起こす問題
- ジョブの長時間化
- 99%のタスクが数秒で終わるのに、残りの1%が数時間かかる。
- メモリ不足 (OOM)
- 特定のワーカーのメモリにデータが入り切らず、Executorがクラッシュする。
- リソースの浪費
- 一部のノードを待っている間、他のノードの計算リソースがアイドル状態(無駄)になる。
データ品質ルールをパイプラインのターゲットスキーマ外のDeltaテーブルに保持する とは?
- 構成のイメージ
- ターゲット(出力先): 実際に業務で使う「売上テーブル」や「顧客テーブル」など。
- 管理用Deltaテーブル(外出し先): 「どのテーブルの、どのカラムに、どんなチェック(NULL禁止など)をかけるか」という設定データだけを格納するテーブル。
- 場所: 業務データとは別の管理用スキーマ(例: metadata_db.quality_rules)に配置します。
- なぜ「外」に保持するのか(メリット)
- コードの変更なしでルールを更新できる
- 新しく「この列にマイナスの値が入らないようにしたい」となった場合、Python/SQLのコードを書き換えてデプロイし直す必要がありません。管理用テーブルに1行追加するだけで、次回の実行から適用されます。
- 非エンジニアでもルール管理が可能
- データアナリストやビジネス担当者が、管理用テーブル(あるいはそれを編集するGUI)を通じて、自分たちで品質ルールを調整できるようになります。
3.ルールの一元管理と再利用
- 複数のパイプラインで同じ「住所フォーマットチェック」などを使う場合、一つのテーブルで定義しておけば、各パイプラインがそれを参照するだけで済みます。
例
from pyspark.sql import functions as F
def apply_quality_rules(df, target_table_name):
# 1. 外部テーブルから該当テーブルのルールを取得
rules_df = spark.table("metadata.quality_rules") \
.filter(F.col("table_name") == target_table_name) \
.collect()
# 2. ルールを適用
# ここでは「検品カラム」を追加し、NGなレコードにフラグを立てる例
for rule in rules_df:
rule_name = f"is_failed_{rule['rule_id']}"
# 条件に合致しない(品質違反)場合に True
df = df.withColumn(rule_name, F.expr(f"NOT ({rule['condition']})"))
return df
使用例
ルールテーブル例
| rule_id | table_name | column_name | rule_type | condition (SQL式) | expectation_level |
|---|---|---|---|---|---|
| 1 | sales_gold |
order_id |
NOT_NULL |
order_id IS NOT NULL |
FAIL |
| 2 | sales_gold |
amount |
POSITIVE |
amount > 0 |
WARN |
| 3 | sales_gold |
status |
VALID_LIST |
status IN ('ordered', 'shipped') |
WARN |
| 4 | player_logs |
pos_x |
STAGE_OUT |
pos_x >= 0 |
WARN |
| 5 | player_logs |
hp |
MAX_CAP |
hp <= 999 |
WARN |
ルールテーブルの作成例
-- 品質ルールを管理するマスターテーブル
CREATE TABLE IF NOT EXISTS metadata.quality_rules (
rule_id INT,
table_name STRING, -- 対象テーブル名
column_name STRING, -- 対象カラム名
rule_type STRING, -- ルール種別(NOT_NULL, RANGE, etc.)
condition STRING, -- SQLの条件式
expectation_level STRING -- 'FAIL' (停止), 'WARN' (警告のみ)
) USING DELTA;
-- サンプルデータの挿入
INSERT INTO metadata.quality_rules VALUES
(1, 'sales_gold', 'order_id', 'NOT_NULL', 'order_id IS NOT NULL', 'FAIL'),
(2, 'sales_gold', 'amount', 'POSITIVE_VALUE', 'amount > 0', 'WARN');
上記の活用と違反情報の出力
from pyspark.sql import functions as F
def apply_quality_rules(df, target_table_name):
# 1. 外部テーブルから該当テーブルのルールを取得
rules_df = spark.table("metadata.quality_rules") \
.filter(F.col("table_name") == target_table_name) \
.collect()
# 2. ルールを適用
# ここでは「検品カラム」を追加し、NGなレコードにフラグを立てる例
for rule in rules_df:
rule_name = f"is_failed_{rule['rule_id']}"
# 条件に合致しない(品質違反)場合に True
df = df.withColumn(rule_name, F.expr(f"NOT ({rule['condition']})"))
return df
# 使用例
raw_df = spark.table("stg_sales")
validated_df = apply_quality_rules(raw_df, "sales_gold")
# 違反があるレコードを特定
failed_records = validated_df.filter("OR ".join([f"is_failed_{r['rule_id']}" for r in rules_df]))
Delta Live Tablesの概要と使用例
簡単に言うと、「データの流れ(パイプライン)をSQLやPythonで宣言するだけで、構築・運用をDatabricksが自動化してくれるフレームワーク」のことです。
Delta Live Tables (DLT) とは?
通常、データパイプラインを作るには「テーブル作成」「データの読み込み」「変換(加工)」「エラーハンドリング」「リトライ処理」などを細かく実装し、ジョブとしてスケジュールする必要があります。
DLTでは、これらを 「どのような状態のテーブルを作りたいか(宣言的)」 と記述するだけで、以下の作業を肩代わりしてくれます。
- インフラの自動管理
- 実行時にクラスターを起動し、終わったら停止。負荷に応じてスケール。
- 依存関係の自動解決
- AテーブルができてからBテーブルを更新する、といった順序を自動判定。
- データ品質の管理 (Expectations)
- 今回話題にしている「品質チェック」を標準機能でサポート。
- リネージ(履歴)の可視化
- データの流れをGUIで確認可能。
DLTの使用例(SQLとPython)
DLTは、主に Medallion Architecture(メダリオン・アーキテクチャ) と呼ばれる、Bronze(生データ)→ Silver(クレンジング)→ Gold(集計)の流れを構築するのに最適です。
A. SQLでの例(宣言的な記述)
SQLの場合、LIVE キーワードを付けるだけでパイプラインの一部になります。
-- 1. Bronze: 生データを読み込む(オートローダー機能)
CREATE OR REFRESH STREAMING LIVE TABLE sales_raw
AS SELECT * FROM cloud_files("/mnt/data/sales_json", "json");
-- 2. Silver: 品質ルールを適用してクレンジング
CREATE OR REFRESH STREAMING LIVE TABLE sales_cleaned (
CONSTRAINT valid_amount EXPECT (amount > 0) ON VIOLATION DROP ROW
)
AS SELECT * FROM LIVE.sales_raw;
B. Pythonでの例
先ほど作成した「外部テーブルのルール」を読み込む仕組みと非常に相性が良いです。
<br />import dlt
from pyspark.sql import functions as F
@dlt.table
def sales_gold():
# 外部のDeltaテーブルからルールを取得して適用するなどの柔軟な処理が可能
return spark.readStream.table("LIVE.sales_cleaned") \
.groupBy("category") \
.agg(F.sum("amount").alias("total_sales"))
なぜDLTを使うのか?(メリット)
- 品質チェックが楽
- EXPECT (条件) ON VIOLATION DROP ROW と書くだけで、不正なデータを自動的に除外(または警告)できます。
- ストリーミングとバッチの融合
- データが届くたびに更新する「ストリーミング」も、定期的な「バッチ」も、同じコードで動かせます。
- エラー復旧
- パイプラインが途中で止まっても、チェックポイントから自動で再開してくれます。