はじめに
DatabricksのCertified Data Engineer Professional試験でよく問われる項目、
理解が甘かったので学習しなおした項目をまとめました。
項目ごとに簡潔に分けているので、ざっと振り返りたい場合などにぜひご活用ください
単純にDatabricksの便利機能を知りたい場合も活用できると思います
Liquid ClusteringとZ-Orderの違い
| 比較項目 | Z-Order (従来の方式) | Liquid Clustering (最新) |
|---|---|---|
| 導入バージョン | 古くから利用可能 | Databricks Runtime 13.3 以降 |
| 主な目的 | 多次元データのフィルタリング高速化 | 書き込み/読み取りの両立と運用の自動化 |
| メンテナンス | 手動で OPTIMIZE を頻繁に実行 |
増分最適化により管理コストが大幅に低下 |
| 書き込み性能 | データの並べ替えにより負荷が高い | 効率的なクラスタリングで書き込みが速い |
| パーティショニング | 通常、物理パーティションと併用 | 物理パーティションを不要にする(推奨) |
| 柔軟性 | カラム構成の変更には全データの再構築が必要 | クラスタリングキーを動的に変更可能 |
| 適用推奨 | 静的な小規模データ、古い環境 | 大規模データ、頻繁な更新、高多次元フィルタ |
- 「ファイルの断片化」や「スキュー(偏り)」 という単語が出てきたら、Liquid Clustering が正解の可能性が高いです。
- 「1つのカラムだけでなく、複数のカラムで効率的に絞り込みたい」 という文脈であれば、両方の手法が該当しますが、最新のベストプラクティスを問われているなら Liquid Clustering です。
mergeSchema=trueはどういう時使うべき?
| 項目 | 内容 |
|---|---|
| 主な用途 | 既存のテーブルに、新しいカラム(列)を含むデータを追加・更新する場合 |
| デフォルト挙動 | Spark/Delta Lakeはデフォルトでスキーマ不一致をエラーにする(保護機能) |
| 実行コマンド例 | df.write.format("delta").mode("append").option("mergeSchema", "true").save(path) |
| 発生する処理 | 書き込みと同時に、ターゲットテーブルのメタデータに新カラムを自動追加する |
| 制約・注意点 | 既存のカラムのデータ型変更(例: String → Int)には使用できない(上書きが必要) |
| 推奨シーン | Bronze層など、ソースデータの項目が頻繁に増える環境でのデータ取り込み |
- 新しいカラムの追加 (Append)
ソースデータに新しい列(例:new_feature)が追加された際、このオプションがないと「Schema mismatch」エラーで停止します。trueにすることで、既存のテーブル構造を動的に拡張します。 - Merge文での利用
SQLのMERGE INTO操作を行う際、ソース側にしかないカラムをターゲットに反映させたい場合にSET T.new_col = S.new_colと記述する代わりに、スキーマ進化を許可して自動でカラムを作成させます。 - 構造化ストリーミング (Structured Streaming)
ストリーム処理中にソースのスキーマが変わる可能性がある場合、チェックポイントと組み合わせてスキーマの変更を許容するために使用します。
expectとexpect_or_dropの違い
| 構文 (Expectation) | データが制約に違反した場合の挙動 | レコードの扱い | パイプライン全体 |
|---|---|---|---|
expect |
メトリクスにカウントされるが、処理は続行される | そのままターゲットに書き込まれる | 正常終了(警告のみ) |
expect_or_drop |
違反したレコードは破棄される | ターゲットには書き込まれない | 正常終了(ドロップのみ) |
expect_or_fail |
パイプラインが即座にエラーで停止する | 書き込み自体が失敗する | 異常終了 |
試験対策のポイント
試験では、「データ品質を確保しつつ、パイプラインの停止は避けたい」というシナリオが出ることがあります。その場合の正解は expect_or_drop です。
一方、「不正なデータが1件でも混入することを絶対に許さない」という要件であれば expect_or_fail を選びます。
expectとexpect_or_dropの活用例
具体的なパイプライン設計の例
1. expect の活用:データ品質のモニタリング
Bronze層からデータを読み込む際、ソースシステム側の不具合を早期発見するために使用します。
– シナリオ: ユーザーIDがたまに欠損するが、システムを止めたくない。
– コードイメージ:
CONSTRAINT valid_user_id EXPECT (user_id IS NOT NULL)
- 結果: 全レコードが保存され、Databricksの「Pipeline Details」画面で「何%のデータがNULLだったか」を視覚的に把握できます。
2. expect_or_drop の活用:データクレンジング
分析用のSilver層テーブルを作成する際に、後続のMLモデルや集計を壊さないために使用します。
- シナリオ: 年齢(age)がマイナスの値や200歳を超えるような、明らかに誤ったデータを除外したい。
- コードイメージ
CONSTRAINT realistic_age EXPECT (age > 0 AND age < 150) ON VIOLATION DROP ROW
- 結果: 正常な範囲のデータだけがテーブルに残り、異常なデータは自動的にフィルタリング(除外)されます。
試験で狙われる「落とし穴」
試験問題では、以下のような判断を求められることがあります。
- 「ドロップされたレコードはどこへ行くのか?」
- デフォルトでは、ドロップされたレコードの内容自体はターゲットテーブルには残りません。もしドロップされた中身も調査したい場合は、
expectを使って「有効フラグ列」を自作するか、Quarantine(隔離)用の別テーブルへ流すロジックを組む必要があります。
- デフォルトでは、ドロップされたレコードの内容自体はターゲットテーブルには残りません。もしドロップされた中身も調査したい場合は、
- 「複数の制約がある場合」
- 1つのテーブルに
expectとexpect_or_dropを混ぜて定義することも可能です。その場合、1つでもdrop条件に触れれば、そのレコードは破棄されます。
- 1つのテーブルに
expectすると、制約違反のデータはどこに行く?
| 構文 | データの行方 | メトリクス(ログ)の記録 |
|---|---|---|
expect |
ターゲットテーブルに保存される | 違反件数として記録される |
expect_or_drop |
破棄される(どこにも保存されない) | 破棄件数として記録される |
expect_or_fail |
保存されない(処理自体が失敗する) | 失敗の原因として記録される |
| Quarantine(隔離)パターン | 「隔離用テーブル」に保存される | ロジックとして実装が必要 |
expect の場合のデータの見え方
expect を設定したテーブルをクエリすると、制約に違反したレコードも正常なレコードと混ざって表示されます。
- 確認方法: DLTパイプラインのUI上にある「Data Quality」タブで、何パーセントのレコードが違反したかの統計を確認できます。
- 用途: 「とりあえずデータは全部入れておいて、後で
WHERE句で除外して分析する」といった運用に適しています。
2. 不正なデータを「別テーブル」に送りたい場合(隔離戦略)
試験や実務で「不正データを捨てたくないが、クリーンなテーブルにも入れたくない」という要件がある場合、DLTでは2つのテーブルに分岐させるロジックを手動で書く必要があります。
- クリーン用テーブル:
expect_or_dropを使用して、正しいデータのみを保持。 - 隔離(Quarantine)用テーブル: 逆に「不正な条件」を
expect(またはフィルタ)で抽出し、エラー調査用に保持。
expectやexpect_or_dropのデータや詳細データってどんな感じでアクセスする?
3. イベントログでの確認
「どのレコードがダメだったか」の統計情報は、Databricksが自動で管理するイベントログ(Event Log)という特別な場所に保存されます。
-- イベントログからデータ品質メトリクスをクエリする例
SELECT
id,
expectations.dataset,
expectations.name AS constraint_name,
expectations.passed_records,
expectations.failed_records
FROM (
SELECT
id,
explode(from_json(details:flow_progress:data_quality:expectations, 'array<struct<dataset:string, name:string, passed_records:int, failed_records:int>>')) AS expectations
FROM event_log
WHERE event_type = 'flow_progress'
)
例:
| id (実行ID) | dataset (テーブル名) | constraint_name (制約名) | passed_records (成功数) | failed_records (失敗数) |
|---|---|---|---|---|
a1b2-c3d4... |
raw_sales |
valid_id |
1000 |
0 |
a1b2-c3d4... |
raw_sales |
positive_price |
995 |
5 |
e5f6-g7h8... |
clean_users |
email_not_null |
450 |
2 |
Windowとwith句の関係性
1. WITH 句(Common Table Expression / CTE)の意味合い
WITH 句は、メインのクエリを実行する前に「仮のテーブル」を定義するようなイメージです。
– 意味合い: 「この複雑な計算結果を temp_table という名前で呼ぶことにする」という宣言。
– メリット: サブクエリ(入れ子構造)を排除できるため、上から下に流れるようにクエリを読めるようになります。
2. WINDOW 関数の意味合い
WINDOW 関数は、集計(GROUP BY)と異なり、「元の行を維持したまま」、その行に関連する周囲のデータを参照して計算します。
– 意味合い: 「現在の行の前後 3 行の平均を出したい」「部署内での自分の順位を知りたい」といった計算。
– メリット: JOIN を繰り返さなくても、行レベルのデータと集計値を横並びにできます。
3. WITH 句と WINDOW 関数を併用する具体例
実務や試験で最も多いパターンは、「WITH 句の中で WINDOW 関数を使って順位を付け、メインクエリでその順位を使ってフィルタリングする」という流れです。
SQL での記述例
「各部署で給与が高いトップ 3 人だけを抽出したい」という場合:
-- 1. WITH句でランキング付きの仮テーブルを作る
WITH ranked_employees AS (
SELECT
name,
dept,
salary,
-- WINDOW関数で部署ごとの順位を計算
DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rank
FROM employees
)
-- 2. メインクエリで順位を使って絞り込む
SELECT * FROM ranked_employees
WHERE rank <= 3;
なぜ併用が必要なのか?
SQL の実行順序のルール上、WHERE 句の中で WINDOW 関数の結果(上記の rank)を直接使うことはできません。 そのため、一度 WITH 句で「順位という列を持ったテーブル」を確定させてから、外側でフィルタリングする必要があるのです。
| 機能 | 役割 | 主な目的 | 試験でのキーワード |
|---|---|---|---|
| WITH 句 (CTE) | 一時的な結果セットの定義 | 複雑なクエリの分割・可読性向上 | 再利用、クエリの整理、ネストの回避 |
| WINDOW 関数 | 行の「枠(窓)」の中での計算 | 移動平均、ランキング、累計の算出 | OVER, PARTITION BY, ORDER BY |
具体的な違い
1. SELECT で(サブクエリを使って)始める場合
一気に結果を出そうとするアプローチです。内側から外側へ読み解く必要があります。
SELECT name, dept, salary
FROM (
-- ここで一度ランキング付きの「中間状態」を作る
SELECT name, dept, salary,
RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rnk
FROM employees
)
WHERE rnk = 1;
テーブルの推移イメージ
- 元データ (
employees): 名前、部署、給与が並んでいる。 - サブクエリ内: 各行の横に
rnk列が計算されて付与される。 - 最終結果: 外側の
WHERE rnk = 1によって、1位以外の行が消える。
2. WITH 句で始める場合
「まずランキング表を作る」「次にそこから抽出する」というレシピのような手順で書くアプローチです。
- STEP 1: ランキング付きの「仮想テーブル」を定義 WITH RankedTable AS ( SELECT name, dept, salary, RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rnk FROM employees ) - STEP 2: 作成した仮想テーブルから1位を抽出SELECT name, dept, salary FROM RankedTable WHERE rnk = 1;
STEP 1: [RankedTable] の作成(メモリ上の仮想テーブル)
| name | dept | salary | rnk (WINDOW関数の結果) |
|---|---|---|---|
| 田中 | 営業 | 500,000 | 1 |
| 佐藤 | 営業 | 450,000 | 2 |
| 鈴木 | 開発 | 600,000 | 1 |
| 高橋 | 開発 | 580,000 | 2 |
STEP 2: [最終出力](WHERE rnk = 1 でフィルタ)
| name | dept | salary |
|---|---|---|
| 田中 | 営業 | 500,000 |
| 鈴木 | 開発 | 600,000 |
hash(email)と言う関数は存在する? 例えばEmailをhash化して、inputに対応したoutputを作りたいようなセキュリティが望まれる場合はどんな関数が望ましい?
まずhashは暗号化の関数ではない
以下が候補の関数
| 手法 | 関数名 (SQL/Python) | 特徴・メリット | 適したユースケース |
|---|---|---|---|
| 標準的なハッシュ | sha2(email, 256) |
高い一貫性と衝突耐性。業界標準。 | 永続的な匿名IDの作成、テーブル結合キー |
| 高速なハッシュ | md5(email) |
処理が速いが、セキュリティ強度は低い。 | 重複チェック、キャッシュキー |
| 暗号化(可逆) | aes_encrypt |
鍵があれば元のEmailを復元できる。 | 監査やトラブル時に元データが必要な場合 |
| マスキング | mask(email) |
一部を * で隠す。ハッシュではない。 |
ダッシュボードでの展示用 |
最大限のセキュリティ提供はoahthとサービスプリンシパル?
その通りです。セキュリティと管理の観点から、PATではなくサービスプリンシパルの利用が推奨されます
バンドルで作ったジョブができてる場合、runコマンドで実行可能?tオプション prodを使う?
databricks bundle run コマンドで実行可能です。また、ご質問の通り、ターゲット(環境)を指定するために -t(または --target)オプションを使用するのが標準的な手順です。
databricks bundle run の仕様とオプションはどんなものがある?
| コマンド / オプション | 役割 | 実行例 |
|---|---|---|
databricks bundle run |
バンドルで定義されたジョブを即座に実行する | databricks bundle run [job_key] |
-t (または --target) |
databricks.yml で定義した環境(dev, prod等)を指定する |
-t prod |
job_key |
resources/jobs 内で定義したジョブの識別名 |
my_sample_job |
--refresh-all |
パイプライン(DLT)などの状態をリフレッシュして実行する | --refresh-all |
もし、あなたの databricks.yml に prod というターゲットと、main_job というジョブが定義されている場合、コマンドは以下のようになります。
# prod環境のジョブを実行する場合 databricks bundle run -t prod main_job`
ワークフローで途中のタスクに勝手に必要パラメータ追加されてこけた場合、どういう再実行スタイルが望ましい?
Databricksのジョブ(ワークフロー)が途中のタスクで失敗し、特に「意図しないパラメータ追加」などの構成ミスが原因だった場合、最も効率的で望ましい再実行スタイルは 「失敗したタスクからの再実行(Repair Run)」 です。
| 再実行スタイル | 内容 | メリット | デメリット |
|---|---|---|---|
| Repair Run (失敗したタスクから再試行) | 失敗したタスクと、その下流のタスクのみを実行する | 時間と計算リソースの節約。成功済みのタスクを繰り返さない | 失敗原因が上流のデータ不備にある場合は解決しない |
| Run Now (すべて再実行) | ワークフローを最初からやり直す | データの整合性が完全に保証される | 成功していた重い処理(Bronze→Silver等)を再度回すためコストが高い |
| 手動ノートブック実行 | 個別のノートブックを開いて修正・実行 | 自由なデバッグが可能 | ジョブの依存関係やパラメータの引き継ぎが再現できない |
その場合の望ましい対応フロー
「パラメータ設定ミス」でコケた場合、以下の手順がベストプラクティスです。
1. 構成の修正: まず、ジョブの設定(あるいはBundlesのソースコード)から、問題となった余計なパラメータを削除・修正します。
2. Repair Runを選択:
– Databricks UIのジョブ実行詳細画面で 「Repair Run(修復実行)」 をクリックします。
– これにより、「修正後の設定」を使って、失敗したタスクから処理が再開されます。
3. 部分的なパラメータ上書き (必要に応じて):
– 再実行時に、その回だけの特定のパラメータを渡して「修復」することも可能です。
なぜ「Repair Run」が望ましいのか?
Databricksのワークフローは、各タスクが成功したという状態を保持しています。
- 効率: 既に完了している上流タスク(例:大規模なデータロード)をスキップできるため、復旧までの時間が最短になります。
- べき等性: データエンジニアリングでは、二重取り込み(Double Ingestion)を防ぐために、一度成功したタスクは繰り返さないのが基本です。
タスクとジョブはどっちがでかいくくり?
「ジョブ(Job)」の方が大きなくくりです。
1つの「ジョブ」という大きな箱の中に、実行したい個別の処理である「タスク(Task)」が複数入っている、という構造になっています。
ジョブとタスクの階層構造
| 単位 | 役割 | 具体例 |
|---|---|---|
| ジョブ (Job) | ワークフロー全体の管理単位。スケジュールや通知を管理する。 | 「日次売上集計パイプライン」 |
| タスク (Task) | ジョブ内で行われる個別の実行ステップ。 | 「データの抽出」「変換」「モデルの推論」 |
ジョブの実行コマンドはrun ? execute?
Databricks CLIやAPIにおいて、ジョブを動かす際の正式な動詞(コマンド)は run です。
execute というコマンドは存在しません。Databricks Asset Bundles (DABs) でも、通常の CLI でも run を使用します。
ジョブ実行に関するコマンド体系
| コンテキスト | 実行コマンド | 用途 |
|---|---|---|
| DABs (バンドル) | databricks bundle run |
バンドルでデプロイしたジョブを起動する |
| Databricks CLI | databricks jobs run-now |
既存のジョブ(Job ID指定)を即座に実行する |
| REST API | POST /api/2.1/jobs/run-now |
外部システムからジョブ実行をリクエストする |
| SQL (参考) | EXECUTE ... |
SQLのストアドプロシージャ等を呼ぶ際に使うが、ジョブには使わない |
バンドルを定義したyamlに関して、permissionとjobの階層はどんな感じになる?
resources:
jobs:
my_analysis_job: # ジョブの識別子
name: "Daily Analysis Job"
tasks:
- task_key: "run_notebook"
notebook_task:
notebook_path: "./notebook.ipynb"
# ここが permissions の階層
permissions:
- group_name: "data-engineers"
level: "CAN_MANAGE"
- group_name: "analysts"
level: "CAN_VIEW"
Saltingはどういう操作で、どういう時に有効?
Databricks(Apache Spark)におけるSalting(ソルティング)は、データの偏り(データスキュー / Data Skew)を解消し、並列処理のボトルネックを打破するための高度なチューニング手法です。
Salting の概要
| 項目 | 内容 |
|---|---|
| 主な目的 | データスキュー(特定のパーティションへのデータ集中)の解消 |
| 操作内容 | 結合キー等にランダムな数値(1〜N)を付加してキーを細分化する |
| 有効な場面 | 特定のID(例:Nullやデフォルト値、巨大顧客ID)による処理遅延時 |
| トレードオフ | 結合相手のテーブルをN倍に増幅させる必要がある(メモリ消費増) |
1. どんな時に有効か?(スキューの検知)
以下のような症状が出ている時に非常に有効です。
– 99%のタスクは数秒で終わるのに、最後の1つのタスクだけが数分〜数時間終わらない。
– Spark UI の「Executor」タブで、特定のコアだけがずっと稼働しており、他が遊んでいる。
– 特定のキー(例:user_id = 0 や country = 'US')のレコード数が数千万件ある。
Saltingはどうすれば行える?
以下のような形
from pyspark.sql import functions as F
# 塩の範囲(パーティションをいくつにバラしたいか)
salt_bins = 5
# テーブルA: 結合キーに 1〜5 のランダムな数値を付加
df_skewed = df_a.withColumn(
"salted_key",
F.concat(F.col("user_id"), F.lit("_"), F.expr(f"int(rand() * {salt_bins})"))
)
様々なフィルター条件を使われる場合、DeltaよりHiveの方が良いケースがある?
現代のDatabricks環境において「Hiveの方が良いケース」はほぼ存在しません。
特に「様々なフィルター条件(WHERE句)」が使われるアドホックなクエリや複雑なワークロードにおいては、Delta Lakeの方が圧倒的に有利です。
試験対策として、なぜHive(標準的なParquetテーブル)ではなくDeltaが選ばれるのか、フィルター性能の観点から比較表にまとめました。
Delta Lake vs Hive (Parquet) のフィルター性能比較
| 機能 | Hive (従来) | Delta Lake (推奨) | フィルターへの影響 |
|---|---|---|---|
| データスキップ | パーティション単位のみ | ファイル内の統計情報(min/max)単位 | 読み込むデータ量を劇的に削減 |
| Z-Order / Liquid | サポートなし | フルサポート | 複数カラムでの絞り込みを高速化 |
| 統計情報の管理 | 手動で ANALYZE が必要 |
書き込み時に自動収集 | 常に最適なクエリプランが作成される |
| ファイルサイズ管理 | 小さなファイルが溜まりやすい | OPTIMIZE で自動結合 |
I/O効率が上がり、スキャンが速くなる |
| インデックス | 基本なし(パーティションのみ) | ブルームフィルタ・インデックス | 特定の値を探す速度が向上 |
2. 多次元的な最適化 (Z-Order / Liquid Clustering)
Hiveのパーティションは通常1つか2つのカラムに限られます。
– Deltaの場合: Liquid Clustering を使えば、多数のカラムに対して柔軟にクラスタリングをかけられます。ユーザーが「日付で絞る」「顧客IDで絞る」「商品カテゴリで絞る」といったバラバラな条件で検索しても、どのパターンでも高速に応答できます。
3. ブルームフィルタ・インデックス (Bloom Filter Index)
特定の高カーディナリティなカラム(例:IDやシリアル番号)で完全一致検索(=)を多用する場合、Deltaではブルームフィルタを作成できます。これにより「そのファイルに探している値が含まれていないこと」を瞬時に判断できます。
Liquid Clusteringって記憶ではパーテーションキーとソートキーは固定しないと使えないはずだが、それでどうやって柔軟な検索が可能になる?
実は、Liquid Clusteringの最大の武器は「パーティションキーという概念を捨て、カラム(キー)を後から自由に入れ替えられる柔軟性」にあります。
なぜ「固定」ではなく「柔軟」なのか?
従来の方式(HiveパーティショニングやZ-Order)と比較して、Liquid Clusteringがなぜ柔軟と言われるのか、その理由を整理します。
| 特徴 | 従来のパーティショニング / Z-Order | Liquid Clustering (最新) |
|---|---|---|
| キーの固定 | テーブル作成時に固定。変更にはテーブル再作成が必要 | いつでも変更可能。 既存データを書き直さずにキーを入れ替えられる |
| 並べ替えの軸 | Z-Orderは一度に指定したカラムで物理固定される | データの分布に合わせて「増分的」にクラスタリングを最適化する |
| 検索の柔軟性 | 指定したキー以外での検索はフルスキャンになりがち | クラスタリングキーに含まれるどのカラムの組み合わせでも高速 |
| カーディナリティ | 高すぎると「スモールファイル問題」が発生する | 高カーディナリティ(ID等)でも自動でサイズ調整される |
Liquid Clusteringでは、以下のコマンドで運用中にキーを変更できます。
- テーブル作成後、やっぱり別のカラムで最適化したくなった場合 ALTER TABLE table_name CLUSTER BY (new_column_1, new_column_2);
このコマンドを打った後の OPTIMIZE 実行時から、新しいキーに基づいてデータが再配置され始めます。「テーブル定義を壊さずに、ビジネス要件に合わせて最適化の軸を変えられる」のが最大の柔軟性です。
2. 「多次元」での高速検索
Z-Orderも多次元検索に強いですが、データが増えるほど「並べ替えコスト」が指数関数的に増大します。
Liquid Clusteringは、「ヒルベルト曲線」などの高度なアルゴリズムを内部で利用し、複数のカラム(最大4つ推奨)の相関関係を保ったままデータを物理的にまとめます。
これにより、「日付のみ」「IDのみ」「日付+ID」といった、どのパターンのフィルター条件が来ても、関連するデータブロックだけを的確に狙い撃ち(Data Skipping)できます。
3. パーティションの「細分化」からの解放
「日付」でパーティションを切ると、1日のデータが少ない場合にファイルが細かくなりすぎて性能が落ちました。
Liquid Clusteringは「物理的なディレクトリ構造」に依存しないため、データ量に応じて自動でクラスターのサイズを調整します。これにより、設計者が「どの粒度でパーティションを切るべきか」と悩む必要がなくなりました。
試験対策のポイント
試験で「Liquid Clusteringの利点」を問われたら、以下のキーワードが正解に直結します。
– No Partition Evolution required: パーティションの再定義(マイグレーション)が不要。
– Support high-cardinality columns: 顧客IDのようなユニークな値が多いカラムでも性能劣化しない。
– Incremental Clustering: OPTIMIZE を実行するたびに、新しく入ってきたデータだけを賢くクラスタリングしてくれる。
Delta Live TablesとStructured Streamingのメリデメ表
Structured Streamingは「エンジン(部品)」であり、DLTはそのエンジンを積んだ「自動運転車(フレームワーク)」です。
DLT vs Structured Streaming 比較表
| 比較項目 | Structured Streaming (低レベルAPI) | Delta Live Tables (高レベル構成) |
|---|---|---|
| 主な役割 | リアルタイム・増分データ処理の記述 | パイプライン全体の管理・運用・品質保証 |
| 開発言語 | Python, Scala, Java, SQL | Python, SQL |
| インフラ管理 | クラスターの構成・起動を手動で管理 | インフラを自動スケール・自動管理 (Serverless) |
| 依存関係 | 各ジョブの実行順序を外部(Workflow等)で制御 | テーブル間の依存関係をグラフ(DAG)で自動解決 |
| データ品質 | ユーザーがチェックロジックを実装 | Expectations機能で宣言的に品質を定義 |
| モニタリング | Spark UI やログで個別に確認 | 統合UIでリネージやデータ品質を可視化 |
| コスト | 低い(自由度が高い分、管理コスト増) | 高め(DBUに加え管理機能のコスト) |
1. Structured Streaming のメリット・デメリット
- メリット:
- 究極の柔軟性: トリガーの間隔、チェックポイントの場所、状態管理(Stateful processing)を細かく制御できる。
- 既存コードの流用: 純粋なSparkコードなので、既存のライブラリや処理ロジックをそのまま組み込みやすい。
- デメリット:
- 運用負荷: チェックポイントの管理や、クラスターが落ちた際の再起動、スキーマ進化の対応などを自分で行う必要がある。
- 可視性の欠如: データの流れ(リネージ)を追うのが難しく、パイプライン全体の状態把握に手間がかかる。
2. Delta Live Tables (DLT) のメリット・デメリット
- メリット:
- 宣言的開発: 「どう動かすか」ではなく「どんなテーブルを作りたいか」を書くだけで、システムが最適化してくれる。
- データ品質 (Expectations):
expect_or_dropなどを使って、不正データを自動で除外・監視できる。 - 自動化: クラスターのサイズ調整(Enhanced Autoscaling)や、エラー時の自動リトライが標準搭載。
- デメリット:
- 制約: DLTのフレームワーク内で許可されていない操作(特定のライブラリの使用や複雑なI/O)が制限されることがある。
- 学習コスト: DLT独自の構文(
live.プレフィックスなど)やライフサイクルを理解する必要がある。
Delta Live TablesとStructured Streamingを併用したコード例
import dlt
from pyspark.sql import functions as F
# --- 1. Bronze層: 外部ソースから Structured Streaming で取り込み ---
# spark.readStream + format("cloudFiles") を使用
@dlt.table(
name="raw_game_logs",
comment="Auto Loaderを使用してクラウドストレージから生ログを取り込む"
)
def raw_game_logs():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/mnt/data/events/")
)
# --- 2. Silver層: DLT間のストリーミング接続とデータ品質管理 ---
# dlt.read_stream を使用して、上流のテーブルから増分を読み出す
@dlt.table(
name="cleaned_game_logs",
comment="不正なスコアを除去し、タイムスタンプを正規化する"
)
@dlt.expect_or_drop("valid_score", "score >= 0") # データ品質の制約
def cleaned_game_logs():
return (
dlt.read_stream("raw_game_logs") # 前のテーブルをストリームとして参照
.withColumn("event_timestamp", F.to_timestamp("event_time"))
.select("user_id", "event_type", "score", "event_timestamp")
)
# --- 3. Gold層: マテリアライズドビューによる集計 ---
# 集計(Window関数など)を行うため、ここではストリームではなくテーブルとして定義
@dlt.table(
name="daily_score_summary",
comment="ユーザーごとの日次合計スコア"
)
def daily_score_summary():
return (
dlt.read("cleaned_game_logs") # 集計の場合は read_stream ではなく read を使うのが一般的
.groupBy("user_id", F.window("event_timestamp", "1 day"))
.agg(F.sum("score").alias("total_daily_score"))
)
| 特徴 | ストリーミングテーブル | マテリアライズドビュー |
|---|---|---|
| 定義方法 (Python) | dlt.read_stream(...) |
dlt.read(...) |
| SQL構文 | CREATE OR REFRESH STREAMING TABLE |
CREATE OR REFRESH LIVE TABLE |
| データの処理 | 「増分」 のみ。新しく届いたデータだけを既存テーブルに追加(Append)する。 | 「最新状態」 を反映。基になるテーブル全体をスキャンして結果を更新する。 |
| 主な用途 | Bronze, Silver層(大量データの高速な取り込み、単純な変換) | Gold層(集計、ランキング、複雑なJoin、最新マスタの反映) |
| 過去データの修正 | 過去分を自動で修正するのは苦手(再計算が必要)。 | ソースが変われば、次回の更新時に計算結果が自動的に修正される。 |
Bronze層で expect(警告のみ)を設定しつつ、その違反レコードをSilver層で「後追い」で除外したいというシナリオ
最もスマートな方法は
「Bronzeは全件入れておき、Silverの定義で expect_or_drop を使う」
という構成です。これにより、Bronzeには調査用の「不正なデータ」を残しつつ、分析用のSilverはクリーンに保てます。
| レイヤー | 設定内容 (Expectation) | データの状態 | 目的 |
|---|---|---|---|
| Bronze | expect |
全レコード(違反含む)を保持 | 監査・デバッグ・元の姿の保存 |
| Silver | expect_or_drop |
違反レコードを自動除外 | 信頼できる分析用データの作成 |
spark.readStream と dlt.read_stream の違いとは?
| 特徴 | spark.readStream |
dlt.read_stream |
|---|---|---|
| 参照先 | 外部ソース (S3, ADLS, Kafka等) | DLT内の別のテーブル (ライブテーブル) |
| 役割 | パイプラインの「入り口」を定義する | パイプライン内の「依存関係」を構築する |
| 依存関係の解決 | 明示的なパス指定が必要 | DLTが自動でテーブル間の順序を計算する |
| 主要な用途 | Auto Loader (cloudFiles) での初回読込 | Bronze → Silver → Gold のデータ伝搬 |
1. spark.readStream:外部からの「取り込み」
spark.readStream は、標準的な Apache Spark の命令です。DLT の外にある「ファイル」や「メッセージキュー」からデータを吸い上げる時に使います。
- いつ使う?: パイプラインの一番最初(通常は Bronze 層)で、クラウドストレージ上の RAW データを読み込むとき。
- 特徴:
cloudFiles(Auto Loader) などのフォーマットを指定して、外部との接続を確立します。
2. dlt.read_stream:内部での「参照」
dlt.read_stream は、DLT フレームワーク専用の特別な関数です。
- いつ使う?: すでに DLT 内で定義した別のテーブル(例:
bronze_table)から、次のテーブル(例:silver_table)へデータを流したいとき。 - 最大のメリット(自動DAG構築):
この関数を使うことで、Databricks は「テーブル A はテーブル B の前に実行しなければならない」という依存関係(DAG: Directed Acyclic Graph)を自動で理解します。
cloudFiles (Auto Loader) ってなに?
| 特徴 | 内容 |
|---|---|
| 主な役割 | 数百万件のファイルを効率的にスキャンし、増分データのみを処理する |
| コード記述 | spark.readStream.format("cloudFiles").load(path) |
| 検知モード | 「ディレクトリ一覧(Directory Listing)」と「ファイル通知(File Notification)」 |
| 最大の強み | スキーマ推論(Inference)とスキーマ進化(Evolution)の自動化 |
| チェックポイント | どこまで読み込んだかを記録し、停止しても再開可能(Exactly-once) |
cloudFiles=AutoLoaderってこと?
結論から言うと、「概念(機能名)としては Auto Loader」であり、「コード上の指定(識別子)としては cloudFiles」です。
TRANSFORM関数とexpect actuialでテストする場合の具体例
| メリット | 内容 |
|---|---|
| 可読性 | df = func3(func2(func1(df))) という「入れ子」を避け、直列に書ける |
| テストのしやすさ | 関数が独立しているため、小さな「Actualデータ」を入れて期待通りか検証できる |
| DLTとの相性 | DLTの定義内でこれらの関数を呼び出すだけで、ロジックがスッキリする |
| カプセル化 | 複雑なビジネスロジックを関数の中に隠し、メイン処理の視認性を保てる |
具体的なコード例
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
# 1. 変換ロジックを独立した関数として定義(再利用・テスト可能)
def add_tax_transform(df: DataFrame) -> DataFrame:
return df.withColumn("total_price", F.col("price") * 1.1)
def filter_invalid_orders(df: DataFrame) -> DataFrame:
return df.filter(F.col("price") > 0)
# 2. 実際のジョブやDLTでの使用例(メソッドチェーン)
# transform() を使うことで、上から下へ流れるように書ける
df_result = (
spark.read.table("bronze_orders")
.transform(filter_invalid_orders)
.transform(add_tax_transform)
)
# --- テストコードのイメージ ---
# 1. テスト用データの作成 (Actual)
input_df = spark.createDataFrame([(100,), (200,)], ["price"])
# 2. ロジックの適用
actual_df = input_df.transform(add_tax_transform)
# 3. 期待するデータの作成 (Expected)
expected_df = spark.createDataFrame([(100, 110.0), (200, 220.0)], ["price", "total_price"])
# 4. 比較(一致すればテスト合格)
assert actual_df.collect() == expected_df.collect()
多様な時間でフィルターする場合は、時間はISOS形式の文字列で保存で充分?それとも値?
「文字列(ISO 8601)」ではなく、必ず「Timestamp型(内部的にはLong値)」で保存すべきです。
| 比較項目 | 文字列 (ISO形式) | Timestamp型 / Date型 |
|---|---|---|
| データサイズ | 大きい (例: 20文字以上) | 小さい (内部的に 8バイト等の数値) |
| フィルター性能 | 遅い(文字列比較になる) | 非常に速い(数値比較で最適化される) |
| データスキップ | 効きにくい(辞書順になる) | 強力に効く(Min/Max統計が正確) |
| 関数の利用 | to_timestamp() 等の変換が必要 |
直接 year(), window() 等が使える |
| 標準化 | タイムゾーンの扱いにミスが出やすい | UTC等の標準形式で厳密に管理可能 |
なぜ「数値(Timestamp型)」の方がフィルターに強いのか?
理由1. Delta Lake の「Data Skipping」
Delta Lake は、ファイルの各カラムの Min(最小値) と Max(最大値) を統計情報として保持しています。
– Timestamp型の場合: 数値として管理されているため、WHERE time > '2026-03-01' というクエリに対し、エンジンが「このファイルの最大値は 2026-02-28 だから読む必要なし」と瞬時に判断(スキップ)できます。
– 文字列の場合: 文字列としての Min/Max 比較になるため、型に比べるとオーバーヘッドが大きく、複雑な時間計算(「1時間前」など)を含むフィルターの最適化が困難になります。
2. パーティショニングと Z-Order
時間をフィルターの主軸にする場合、Date 型に変換してパーティショニングしたり、Timestamp 型で Z-Order や Liquid Clustering を適用したりします。
これらは数値的な順序に基づいてデータを物理的に並べ替えるため、型が一致していないと十分な性能を発揮できません。
Timestamp型を効率的に、かつ多様な条件でフィルターする際のSQL例
-- 特定の期間を指定 SELECT * FROM orders WHERE order_timestamp BETWEEN '2026-01-01 00:00:00' AND '2026-01-31 23:59:59'; -- 単純な比較演算子 SELECT * FROM orders WHERE order_timestamp >= '2026-03-01';
※内部的に「これは日時のデータだな」と 暗黙的に型変換(キャスト) して比較してくれている
Shuffle Partitionとは何?
| 項目 | 内容 |
|---|---|
| 発生タイミング | Wide Transformation(Join, GroupBy, Distinctなど)の実行時 |
| デフォルト値 | 200 (オープンソースSparkの場合) |
| 設定パラメータ | spark.sql.shuffle.partitions |
| 役割 | シャッフル後のタスクの並列度を決定する |
| 影響 | 少なすぎるとメモリ不足(OOM)、多すぎるとオーバーヘッド増 |
なぜ「シャッフル」が必要なのか?
例えば、各ノードにバラバラに散らばっている「売上データ」を「地域ごと」に集計したい場合、同じ地域のデータを一つの場所に集める必要があります。この 「データをネットワーク越しに移動させて再配置するプロセス」がシャッフルです。
このとき、移動したデータをいくつの塊(パーティション)に分けるかを決めるのが Shuffle Partition です。
Liquid Clusteringってパーテーションキーとソートキーは途中でいくらでも変えれる?
はい、いつでも、何度でも変更可能です。
Delta Sharingに関するopen share機能って静的Deltaテーブルのみ共有可能?ノートブックなどは?
Open Sharingはdatabricks外への共有を行うもの
| アセットの種類 | Open Sharing (外部) | Databricks-to-Databricks |
|---|---|---|
| Delta テーブル | ○ 共有可能 | ○ 共有可能 |
| ビュー (View) | ○ 共有可能 (Unity Catalog経由) | ○ 共有可能 |
| ボリューム (Volumns) | ○ 共有可能 (非定形ファイル) | ○ 共有可能 |
| ノートブック | × 共有不可 | ○ 共有可能 |
| モデル (MLflow) | × 共有不可 | ○ 共有可能 |
Open Sharingの手順
1. アクティベーションリンクの送付:
データ提供者が共有設定をすると、一回限りの「アクティベーションリンク(URL)」が発行されます。これを相手にメール等で送ります。
2. 資格情報ファイル(JSON)のダウンロード:
相手がそのリンクをクリックすると、資格情報ファイル(config.share)というJSONファイルがダウンロードできます。
– 注意: このリンクには有効期限があり、一度しかダウンロードできません。
3. オープンソースクライアントでの接続:
相手は、Python、Pandas、Power BI、Apache Spark、TableauなどのDelta Sharing対応クライアントにそのJSONファイルを読み込ませることで、あたかも自分の手元にデータがあるかのようにクエリを実行できます。
データを保存する場合、UCのマスク関数では、保存されるデータはマスクされない問題がある?
はい、その通りです。
Unity Catalog(UC)の「カラムマスク(Masking Functions)」は、「読み取り時(クエリ実行時)」に動的にデータを隠す機能であり、ストレージ(S3やADLS上のDeltaファイル)に保存されている物理データそのものを書き換えるわけではありません。
マテリアライズドビューとはどんなもの?
Materialized Viewは、簡単に言うと「クエリの結果をあらかじめ計算して、物理的に保存しておくテーブル」のことです。
| 特徴 | 通常のビュー (View) | マテリアライズドビュー (MV) | 通常のテーブル (Table) |
|---|---|---|---|
| 実体データ | なし(定義のみ) | あり(物理保存) | あり(物理保存) |
| 読み取り速度 | 遅い(毎回計算) | 速い(計算済み) | 速い |
| 最新性 | 常に最新 | リフレッシュが必要 | 手動更新 |
| 計算コスト | 読み取りのたびに発生 | リフレッシュ時に発生 | 書き込み時に発生 |
コード比較例
通常のビュー
-- 定義するだけ(データは保存されない) CREATE VIEW main.default.high_value_orders_view AS SELECT order_id, customer_id, amount FROM main.default.orders WHERE amount > 1000;
マテリアライズドビュー
-- データを計算して物理保存する CREATE MATERIALIZED VIEW main.default.high_value_orders_mv AS SELECT order_id, customer_id, amount FROM main.default.orders WHERE amount > 1000; -- 最新状態にするには「リフレッシュ」が必要 REFRESH MATERIALIZED VIEW main.default.high_value_orders_mv;
DLT版 通常のビュー
import dlt
@dlt.view
def temporary_filter_view():
# パイプライン内でのみ有効な一時的なビュー
# カタログには保存されない
return dlt.read("raw_data").filter("id IS NOT NULL")
DLT版 マテリアライズドビュー
import dlt
@dlt.table(
name="sales_summary_mv",
comment="集計結果を保持するマテリアライズドビュー"
)
def sales_summary():
# dlt.read() を使うことで、上流の全データを読み込んで集計し、
# その結果を物理テーブルとして保存する
return (
dlt.read("cleaned_sales")
.groupBy("region")
.sum("amount")
)
具体的な挙動のイメージ
例えば、数億件の「注文ログ」から「日別の売上集計」を出す場合
1. 作成: CREATE MATERIALIZED VIEW daily_sales AS SELECT date, sum(amount) FROM orders GROUP BY date;
2. 物理保存: Databricks はこの集計結果を Delta テーブルとして保存します。
3. 参照: ユーザーが SELECT * FROM daily_sales と打つと、数億件の計算を飛ばして、集計後の数千行を読み込むだけになります。
4. リフレッシュ: 新しい注文データが入ってきたら、REFRESH MATERIALIZED VIEW daily_sales を実行(または自動スケジュール)して中身を最新にします。
試験・実務でのポイント
- 「いつ MV を使うべきか?」:
- ベーステーブルが巨大。
- 変換処理(Transform)が重い。
- クエリの頻度が高い(BIダッシュボードなど)。
- 「最新性」の妥当性:
- MV はリフレッシュされるまでデータが古くなる可能性があります。秒単位のリアルタイム性が必要な場合は、通常のビューやストリーミングテーブルを検討します。
- Unity Catalog での管理:
- 最近の Databricks では、Unity Catalog 上で作成する MV が推奨されています。これにより、他の BI ツールからも高速な MV にアクセスできるようになります。
マテリアライズドビューは複雑な財務処理関数などと相性いい?
「非常に相性が良い」です。
複雑な財務処理(外貨換算、複雑な償却計算、税額算出など)は計算コストが高く、かつコードの保守性が求められるため、マテリアライズドビュー(MV)を活用するメリットが最大化されます。
計算コストの「事前払い」ができるため
列のマスクを、ユーザーグループごとによってUDFを使って実現する場合、どんな実装になる?
- マスク用関数(SQL UDF)の作成
CREATE OR REPLACE FUNCTION email_mask(email STRING)
RETURN CASE
-- HR(人事)グループならそのまま見せる
WHEN is_account_group_member('hr_users') THEN email
-- 一般ユーザーならドメイン以外を隠す(例: a***@example.com)
WHEN is_account_group_member('standard_users') THEN regexp_replace(email, '^.*(?=@)', '***')
-- それ以外(外部の人など)は完全に隠す
ELSE 'REDACTED'
END;
- テーブルのカラムにマスクを適用する
-- テーブル作成時に指定する場合 CREATE TABLE customers ( user_id INT, email STRING MASK email_mask, -- ここで関数をバインド full_name STRING ); -- 既存のテーブルに後から適用する場合 ALTER TABLE customers ALTER COLUMN email SET MASK email_mask;
CatalogレベルでUCで権限設定してる場合、現状あるテーブルや、以降作る配下のスキーマなどの権限はどうなる?カスケード(継承される)?
はい、Unity Catalog(UC)の権限は「カスケード(継承)」されます。
1. メタストア (Metastore)
└── 2. カタログ (Catalog) ← ここで権限を振ると...
└── 3. スキーマ (Schema) ← 自動で継承
└── 4. テーブル / ビュー / ボリューム ← 自動で継承
3. 注意点:継承の「上書き」や「拒否」はできる?
実務や試験で混乱しやすいポイントがいくつかあります。
– 「例外的に一部のスキーマだけ隠す」は難しい:
カタログレベルで SELECT を与えてしまうと、そのカタログ内の特定のスキーマだけを見せないようにすることはできません。
– 対策: 権限を絞りたい場合は、カタログレベルではなく、個別のスキーマレベルで権限を付与するのがベストプラクティスです。
– Ownership(所有権)の継承:
所有権(OWNER)は継承されません。カタログの所有者であっても、他人が作ったスキーマの所有者にはなりませんが、カタログレベルの強力な権限(CREATE など)によって管理は可能です。
Autoloaderで、imageや、binary、textって括りはある?
Auto Loader(cloudFiles)において、「image」や「binary」といった専用のフォーマット指定(括り)はありません。
Auto Loaderが直接サポートしている「ファイル形式」は、あくまで構造化・半構造化データ(JSON, CSV, Parquet等)が中心です。しかし、画像やバイナリファイルを扱いたい場合には、binaryFile という形式を組み合わせて使うのが一般的です。
SCD Type 1 vs. Type 2ってなに
データエンジニアリングの世界で避けては通れないのが、この SCD(Slowly Changing Dimension:ゆっくり変化する次元) です。
「マスタデータの値が変わったとき、過去の履歴をどう扱うか?」 という戦略の違いです。
SCD Type 1:上書き(履歴を残さない)
「過去なんて振り返らない」スタイルです。既存のレコードを新しい値でそのまま上書きします。
- 挙動:
UPDATE処理のみ。 - 用途: 名前のタイポ修正や、過去の値を保持する必要がまったくない項目(例:Emailアドレスの変更など)。
- メリット: データ量が最小限で済み、クエリが非常にシンプル。
- デメリット: 「1ヶ月前はどうだったか?」という分析が不可能になる。
SCD Type 2:履歴保持(新しい行を追加)
「過去も大切にする」スタイルです。値が変わるたびに新しい行を追加し、どのデータが「いつからいつまで有効だったか」を管理します。
- 挙動: 既存の行を「古い」とし、新しい行を
INSERTする。 - 管理カラム: 通常、
start_date(有効開始日)、end_date(有効終了日)、is_current(最新フラグ)を追加します。 - 用途: 住所(地域別の売上分析に必要)、役職、製品価格など、時間の経過に伴う変化を追いたい項目。
- メリット: 過去の任意の時点の状態を正確に再現できる。
- デメリット: データ量が肥大化し、結合(Join)やクエリが複雑になる。
| 比較項目 | SCD Type 1 | SCD Type 2 |
|---|---|---|
| データ操作 | Overwrite(上書き) | Add a new row(行追加) |
| 履歴の保持 | なし | あり(完全な履歴) |
| テーブルサイズ | 変化なし | 増大し続ける |
| 複雑さ | 低い(単純なUpdate) | 高い(日付管理が必要) |
| 主な用途 | ミスの修正、重要度の低い更新 | 監査要件、時系列分析、トレンド調査 |
CDC データってなに
CDC (Change Data Capture) とは、日本語で「変更データキャプチャ」と呼ばれ、データベースに加えられた「追加(Insert)」「更新(Update)」「削除(Delete)」の変更履歴だけを抽出して、別のシステムに同期させる手法のことです。
これまでに話した「Auto Loader」や「SCD Type 2」と非常に関連が深い技術です。
なぜ CDC が必要なのか?
例えば、基幹システムの MySQL にある「顧客テーブル」を Databricks に同期したいとします。
- 従来の方法(フルロード):
- 毎日、数千万件の全データを引っこ抜く。
- → データベースに負荷がかかりすぎるし、時間もかかる。
- 毎日、数千万件の全データを引っこ抜く。
- CDC の方法:
- 「今日の10時に Aさんの住所が変わった」というログだけを追いかける。
- → 負荷が極めて低く、ほぼリアルタイムで同期できる。
- 「今日の10時に Aさんの住所が変わった」というログだけを追いかける。
CDC データの構造(イメージ)
CDC ツール(Debezium など)を通すと、データは通常以下のような「メタデータ付きの形式」で届きます。
| 変更内容 (Op) | 顧客ID | 名前 | 住所 | 更新日時 |
|---|---|---|---|---|
| I (Insert) | 101 | 田中 | 東京 | 09:00 |
| U (Update) | 101 | 田中 | 大阪 | 10:30 |
| D (Delete) | 105 | 佐藤 | NULL | 11:00 |
Databricks における CDC の強力な味方
CDC データは「単なる変更の羅列」なので、そのままではテーブルとして使えません(田中さんの最新住所は?という問いに答えにくい)。そこで、以下の機能を使います。
1. APPLY CHANGES INTO (Delta Live Tables)
これが最強のツールです。CDC で届いた「I, U, D」のフラグを読み取り、ターゲットの Delta テーブルに対して SCD Type 1 や Type 2 を自動適用してくれます。
2. MERGE 文
SQL や Python で手動で CDC データを反映させる場合は、MERGE 文を使います。「マッチしたら Update、なければ Insert」という処理を 1 つのトランザクションで行います。
Databricks(Delta Lake)でSCDやCDCを扱う際の、「現場でそのまま使えるSQL」を整理します。
主に、手動で実行する MERGE INTO と、Delta Live Tables (DLT) で自動化する APPLY CHANGES INTO の2つのパターンがあります。
1. 手動で行う:MERGE INTO (SCD Type 1)
「既存のデータがあれば更新、なければ追加」という最も一般的な操作です。
MERGE INTO customers AS target
USING customer_updates AS source
ON target.user_id = source.user_id
-- 1. マッチした場合:新しい値で上書き(SCD Type 1)
WHEN MATCHED THEN
UPDATE SET
target.email = source.email,
target.address = source.address,
target.updated_at = source.updated_at
-- 2. マッチしなかった場合:新規作成
WHEN NOT MATCHED THEN
INSERT (user_id, name, email, address, updated_at)
VALUES (source.user_id, source.name, source.email, source.address, source.updated_at);
2. DLTで自動化する:APPLY CHANGES INTO
CDCデータを処理する場合、この構文を使うのがDatabricksのベストプラクティスです。
SCD Type 1 の場合(上書き)
CREATE OR REFRESH STREAMING TABLE target_table; APPLY CHANGES INTO LIVE.target_table FROM STREAM(LIVE.source_cdc_stream) KEYS (user_id) - 主キー SEQUENCE BY updated_at - 順序保証用のカラム COLUMNS EXCEPT (event_type) - 除外するカラムがあれば指定 STORED AS SCD TYPE 1; - Type 1(履歴なし)を指定
SCD Type 2 の場合(履歴保持)
これだけで、__start_at や __end_at といった履歴管理用のカラムが自動生成されます。
APPLY CHANGES INTO LIVE.target_table FROM STREAM(LIVE.source_cdc_stream) KEYS (user_id) SEQUENCE BY updated_at STORED AS SCD TYPE 2; -- Type 2(履歴保持)を指定
Change Data Feedを実際に使う例
Change Data Feed (CDF) は、Deltaテーブルに加えられた「変更そのもの(インサート、更新前の値、更新後の値、削除)」を、ログとして抽出できる機能です。
単に「最新の状態」を見るのではなく、「何がどう変わったか」という履歴を、下流の処理に効率的に伝えるために使われます。
CDF を使うための 3 ステップ
1. テーブルで CDF を有効にする
デフォルトではオフなので、テーブル作成時か後から有効にする必要があります。
- テーブル作成時に有効化 CREATE TABLE silver_users ( id INT, name STRING, city STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); - 既存のテーブルで有効化 ALTER TABLE silver_users SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
2. データの変更操作を行う
通常通り、INSERT や UPDATE を行います。
UPDATE silver_users SET city = 'Tokyo' WHERE id = 101;
3. 変更点だけを読み出す (SQL / Python)
ここが CDF の真骨頂です。特定のバージョン間や時間帯の「差分」を指定して取得できます。
SQL の場合:
- バージョン1から最新までの「変更履歴」を取得
SELECT FROM table_changes('silver_users', 1);
- 特定の時間範囲で取得
SELECT FROM table_changes('silver_users', '2026-03-04 10:00:00', '2026-03-04 12:00:00');
ウォーターマークとは、使用例
ウォーターマーク(Watermark)とは、ストリーミング処理において「遅れて届いたデータをどこまで待つか」を定義する境界線のことです。
from pyspark.sql import functions as F
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/datapath"))
ウォーターマークの設定:event_timeを基準に、5分間の遅延を許容
windowed_counts = (df
.withWatermark("event_time", "5 minutes")
.groupBy(
F.window("event_time", "10 minutes")
)
.count())
この設定での挙動
- 20:00 〜 20:10 のウィンドウが集計中。
- 20:05 に発生したデータが、20:09 に届いた → 集計に含まれる。
- 20:02 に発生したデータが、20:12 に届いた → 5分以上の遅れだが、ウォーターマークがまだ更新されていなければ入る可能性がある(※詳細後述)。
- システムが 20:15 のデータを処理し、ウォーターマークが 20:10 まで進んだ後、20:04 のデータが届いた → 完全に無視される。
ウォーターマーク、with句、windowなど、必要なものを用いて、以下のアラートを生むSQL
条件
- 1日一回アラート
- アラート条件を満たす時は毎回アラート
- 毎日何回もアラートは出ない
- 条件は3日連続平均気温が5度以上の時
-- 1. 日次の平均気温を算出するストリーミングテーブル(Bronze -> Silver)
-- ここで1日1行のデータに集約し、ウォーターマークで古いデータを管理します
WITH daily_summary AS (
SELECT
window.start AS alert_date,
AVG(temperature) AS avg_daily_temp
FROM STREAM(sensor_data)
-- データの遅延を最大1日許容
WATERMARK event_time DELAY '1 day'
-- 1日単位でウィンドウ集計(これで1日1件のレコードになる)
GROUP BY window(event_time, '1 day')
),
-- 2. 3日間の移動平均を計算する(Silver -> Gold相当)
moving_average_stats AS (
SELECT
alert_date,
avg_daily_temp,
-- 過去2行+現在行(計3日)の平均を算出
AVG(avg_daily_temp) OVER (
ORDER BY alert_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS three_day_avg,
-- 3日分のデータが揃っているかチェックするためのカウント
COUNT(avg_daily_temp) OVER (
ORDER BY alert_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS day_count
FROM daily_summary
)
-- 3. 最終的なアラート判定
SELECT
alert_date,
three_day_avg,
'ALERT: 3-day consecutive average temperature >= 5.0' AS alert_message
FROM moving_average_stats
WHERE
-- 3日間の平均が5度以上
three_day_avg >= 5
-- 運用開始直後(1日目や2日目)の誤検知を防ぐため、3日分揃っていることを確認
AND day_count = 3
Coalesce vs Repartition:の違い。まずCoalesce(1)とは何
Coalesce(1) は、「バラバラに散らばったデータを、無理やり1つの箱(パーティション)に凝縮する」操作です。
データエンジニアリングの実務において、Coalesce(1) が登場するシーンはほぼ一つ:「最終結果を1つのファイルとして出力したい時」です。
1. Coalesce(1) とは何をしているのか?
Apache Sparkにおいて、データは通常複数のワーカーノードに分散して保持されています。Coalesce(1) を実行すると、Sparkはネットワーク経由で全データを1つのノードに集め、1つのパーティションにまとめます。
最大の特徴:シャッフルを「最小限」に抑える
Repartition との決定的な違いは、フルシャッフル(全データの無差別な再配置)を避ける点です。Coalesce は既存のパーティションを「結合」するだけなので、データの移動を最小限に抑えようとします。
| 特徴 | Coalesce | Repartition |
|---|---|---|
| パーティション数 | 減らすことしかできない | 増やすことも減らすことも可能 |
| シャッフル | 基本発生しない(最小限) | 必ず発生する(フルシャッフル) |
| データの均一性 | 偏り(スキュー)が出やすい | 均等に再配置される |
| 主な用途 | 処理の最後にファイル数を減らす | 並列度を上げて処理を高速化する |
| コスト | 低い(効率的) | 高い(ネットワーク負荷大) |
3. なぜ Coalesce(1) を使うのか?(実務例)
一番多いのは、外部システム(Excelで読み込むCSVや、特定のツールが読み込む設定ファイルなど)が「1つのファイルしか受け付けない」場合です。
# 何百個もファイルができるのを防ぎ、result.csv 1つだけを作る
df.coalesce(1).write.format("csv").save("/path/output")
Wheel Fileを、インターネット経由なしで活用したい場合、どんな感じでインストールするのが望ましい?
インターネットに接続できない環境(エアギャップ環境や厳しいプロキシ制限下)でPythonの Wheelファイル (.whl) を活用する場合、
Databricksでは
「Unity CatalogのVolumes」
または
「Workspace Files」
にファイルを配置してインストールするのが現代的でスマートな方法です。
環境や用途に合わせて、望ましい3つのアプローチを解説します。
1. Unity Catalog Volumes を使う (推奨:最もセキュアで近代的)
Unity Catalogが有効な環境であれば、Volumes(ボリューム)を「社内限定のアーティファクト置き場」として使うのがベストです。
- 手順:
- UIまたはCLIで、ローカルの
.whlファイルを Volumes のパス(例:/Volumes/main/default/my_lib_vol/)にアップロードします。 - ノートブックの先頭で以下を実行します。Python
%pip install /Volumes/main/default/my_lib_vol/my_package-0.1-py3-none-any.whl
- UIまたはCLIで、ローカルの
-
メリット: 権限管理(GRANT)が効くため、特定のチームだけにライブラリの使用を許可できます。
複雑な財務処理UDF × Pandas on Spark の上手い使い方はどんな感じ?
各グループ(例:国別、通貨別)ごとに複雑な計算を行う場合、applyInPandas (Grouped Map UDF) を使うのがベストです。
- 理由: 通常のUDFは1行ずつ処理するため遅いですが、
applyInPandasはグループ単位でデータを Pandas DataFrame としてまとめて Python プロセスに渡し、ベクトル演算(一括計算)を行います。 - メリット: 財務ライブラリ(numpyや既存のPythonロジック)をそのまま使いつつ、Sparkで並列処理ができます。
クエリの実行時間はどこで見るべき?
- Query Profile (SQLブラウザ):
- 「どの演算(JoinやFilter)に時間がかかったか」「インデックス(Data Skipping)が効いているか」を視覚的に見る場所です。SQLチューニングならここ一択です。
- ジョブの履歴画面 / Spark UI:
- 「特定のタスクが偏っていないか(スキュー)」「ネットワーク転送(シャッフル)が重くないか」など、インフラ側の挙動を確認するのに適しています。
Shuffle Hash Join とは
結合する両方のテーブルを、結合キーに基づいて全ノードにシャッフル(再配置)し、各ノードでハッシュテーブルを作成して結合する手法です。
- 使い時:
- 片方のテーブルがメモリに乗るほど小さくはないが、Sort Merge Join(並べ替え)をするには重すぎる、という中規模なテーブル同士の結合で有効です。
# Join実行
joined_df = df_large.join(df_medium.hint("SHUFFLE_HASH"), "key")
# 実行計画(Physical Plan)の確認
joined_df.explain()
Sort Merge JoinとShuffle Hash Joinの比較
| 特徴 | Sort Merge Join (SMJ) | Shuffle Hash Join (SHJ) |
|---|---|---|
| デフォルト | 標準設定(最も汎用的) | 特定の条件下でのみ選択される |
| 主な処理手順 | Shuffle → Sort → Merge | Shuffle → Hash Table構築 → Probe |
| メモリ消費 | 低(ソート済みなら順次読み込むだけ) | 高(片方のデータをメモリ上に保持) |
| CPU負荷 | 高(ソート処理が重い) | 低(ハッシュ検索なので高速) |
| 安定性 | 非常に高い(メモリ不足時にディスクへ逃がせる) | やや低い(ハッシュ表が溢れるとOOM) |
| データサイズ | 超大規模同士でもOK | 片方が「中規模」の時に有利 |
データのスキュー回避策
- maxPartitionBytes の調整: 1パーティションあたりのサイズを小さくして、タスクをより細かく分散させるのは有効です。
- インスタンス強化: メモリを増やすと OOM(メモリ不足)は防げますが、スキュー(1つのコアだけ頑張っている状態)の根本解決にはなりません。
- Salting (ソルティング): 結局、これが最も根本的な解決策です。
タグをつけるコマンド
テーブルやカラムにタグを付けるのは ALTER TABLE コマンド(SQL)で行います。
ALTER TABLE table_name SET TAGS ('department' = 'finance', 'priority' = 'high');
※ sql.conf はセッションごとの設定変更に使うもので、メタデータ(タグ)の付与には使いません。
テーブルやカラムにタグを付けるのはどんな時?
1. セキュリティとコンプライアンス(PII管理)
最も一般的かつ重要な用途です。個人情報(PII)や機密情報が含まれるカラムにタグを付けます。
- タグの例:
Sensitivity: High,PII: Email,GDPR: Relevant - メリット: どのテーブルに個人情報があるか一目で分かり、監査レポートの作成が容易になります。将来的に、特定のタグが付いたカラムを一括で非表示にするような「属性ベースのアクセス制御(ABAC)」の基盤になります。
2. データカタログの検索性向上(データ発見)
数千のテーブルがある環境では、名前だけでは中身が判断できません。ユーザーがデータを探しやすくするためにタグを使います。
- タグの例:
Project: Marketing_Alpha,Status: Gold,Domain: Sales - メリット: Catalog Explorer の検索窓に「Marketing」と打ち込むだけで、関連するテーブルを即座に絞り込めます。
AIがカラムの説明を自動生成する機能はある?
Unity Catalog(UC)の機能として、
AI-generated documentation
が提供されています。
カタログエクスプローラー上で「Generate」ボタンを押すと、テーブル名やデータの中身をAIが推論し、カラムの説明(Description)を自動で下書きしてくれます。
DatabricksのAI-generated documentationについては、テーブルにカラムが追加される?
テーブルに新しいカラム自体が物理的に追加されることはありません。
AIが生成したテキストは、Unity Catalog上の以下のフィールドに書き込まれます。
– Table Comment: テーブル全体の概要。
– Column Comment: 各カラムの意味や内容の説明。
Delta は外部テーブルの最新情報を保証する?
- Managed Table(管理テーブル): Databricksが完全に管理するため、常に最新です。
- External Table(外部テーブル): Delta形式であれば、他のエンジンで書き込まれても
VACUUMやOPTIMIZEの整合性は保たれますが、非Delta(Parquet等)の外部ファイルを直接参照している場合は、REFRESH TABLEを実行するまで新しく追加されたファイルが見えないことがあります。
Standard と Dedicated みたいな設定はある?
おそらく SQL Warehouse の設定のことだと思われます。
- Serverless: リソース管理が不要で、すぐに起動する(推奨)。
- Pro: ワークフローなどに適した標準的な構成。
- Classic: 最も基本的な構成(古い機能)。
「Dedicated(占有)」という言葉は、特定のクラスターを1人のユーザーや1つのジョブで占有して使う設定を指す際によく使われます。
まず@dltってコマンドは何を意味する?
Databricksのデータエンジニアリングにおいて、@dlt(またはPythonのデコレータとしての @dlt.table / @dlt.view)は、Delta Live Tables (DLT) というパイプライン専用のフレームワークで使われる宣言的な修飾子を意味します。
簡単に言うと、「これから書く関数(またはSQL文)は、ただのクエリではなく、DLTパイプラインの一部として管理されるテーブルだよ」とシステムに教える合図です。
@dlt が持つ主な役割
DLTパイプラインを実行する際、このデコレータがあることで以下の処理が自動化されます。
- 依存関係の自動解決 (DAGの構築):
どのテーブルがどのテーブルを参照しているかを解析し、正しい順番でデータを処理する「実行計画(グラフ)」を自動で作ってくれます。 - スキーマの自動管理:
データの構造が変わっても、ターゲットとなるDelta Tableのスキーマを自動的に適用・更新します。 - チェックポイントとリトライ:
処理が途中で止まっても、どこまで進んだかを記録しているため、最初からやり直す必要がありません。
import dlt
@dlt.table(
name="raw_player_logs",
comment="ゲームの生ログデータ"
)
def raw_player_logs():
return spark.readStream.format("cloudFiles").load("/path/to/logs")
CREATE OR REFRESH LIVE TABLE raw_player_logs
AS SELECT * FROM read_files("/path/to/logs")
CREATE OR REFRESH LIVE TABLE hogehoge ってどんなコマンド?
SQLにおける CREATE OR REFRESH LIVE TABLE は、Delta Live Tables (DLT) というフレームワーク特有の構文です。
通常のSQL(Spark SQL)の CREATE TABLE とは決定的に違う、「宣言型(Declarative)」という考え方が反映されています。一言でいうと、「このテーブルを常に最新の状態に保て(手順はお任せする)」という命令です。
それぞれの単語の意味を解剖してみましょう。
1. 各キーワードの役割
- CREATE OR REFRESH
「テーブルがなければ作る。すでにあるなら、中身を最新のデータに更新(リフレッシュ)する」という意味です。手動でINSERTやUPDATEを書く必要がなくなります。 - LIVE
これが最も重要です。このテーブルが DLTパイプラインの一部 であることを示します。通常のテーブルとは違い、背後で「依存関係の管理」や「自動リトライ」が行われる特別な存在になります。 - TABLE
結果を物理的な Delta Table として保存します(対照的に、一時的な計算結果ならLIVE VIEWを使います)。
2. なぜこれを使うのか?(データエンジニアのメリット)
データエンジニアとしての実務(Databricks利用時)では、以下のような面倒な作業がすべて自動化されます。
- 増分更新の自動化: ソースデータに新しいログ(ゲームのプレイ記録など)が追加されたとき、前回の続きから自動で読み込んでくれます。
- 依存関係の解決: 例えば「Goldテーブル」が「Silverテーブル」を参照している場合、Silverが更新された後にGoldを動かす、といった実行順序をDLTが勝手に判断します。
- スキーマの進化: ゲームのアップデートでログのカラムが増えた場合も、オプション設定(Schema Evolution)次第で自動対応できます。
spark.readStream.format(“cloudFiles”)についてcloudFiles以外のオプションは?
| オプション | 読み取り対象 | 主なメリット | デメリット |
|---|---|---|---|
| cloudFiles | クラウド上のファイル | スキーマ推論、スケーラビリティ | Databricks特有の機能 |
| delta | Deltaテーブル | 極めて高速、正確な1回限りの処理 | 元データがDelta形式である必要あり |
| parquet / csv | 標準ファイル | 汎用性が高い | ファイル増大時にパフォーマンスが劣化 |
| kafka / kinesis | イベントストリーム | 超低遅延 | データの保持期間(リテンション)に制限あり |
WINDOW関数と.windowは別物?
ややこしいがF.window()というまた別のものもある。
1. SQL の WINDOW 関数
標準的なSQLで、特定の範囲(パーティション)に対して計算を行うための構文です。
- 記述場所:
SELECT文の中。 - 特徴:
OVER (PARTITION BY ... ORDER BY ...)という句を使います。
SELECT user_id, score, -- これがSQLのWINDOW関数 AVG(score) OVER (PARTITION BY user_id ORDER BY date) as rolling_avg FROM game_logs
2. DataFrame API の .window() (PySpark)
Pythonのコード内で、SQLの OVER 句と同じ「計算の枠組み」を定義するためのメソッドです。
- 記述場所:
- Python コード(
pyspark.sql.Windowモジュール)。
- Python コード(
- 特徴:
- まず「窓の定義」を作り、それを
over()メソッドに渡します。
- まず「窓の定義」を作り、それを
from pyspark.sql import Window
import pyspark.sql.functions as F
# 1. 窓(WindowSpec)を定義する
w = Window.partitionBy("user_id").orderBy("date")
# 2. 関数に適用する
df.withColumn("rolling_avg", F.avg("score").over(w))
| 比較項目 | SQL (WINDOW関数) | PySpark (.window) |
|---|---|---|
| 可読性 | SQLに慣れていれば直感的。 | 変数として定義できるので、再利用しやすい。 |
| 動的な操作 | 文字列操作が必要。 | Pythonの変数として条件分岐などで扱いやすい。 |
| 戻り値の型 | カラムの値そのもの。 | WindowSpec オブジェクト(定義情報)。 |
| 項目 | Window オブジェクト (WINDOW関数) | F.window() 関数 (Time Windowing) |
|---|---|---|
| 主な用途 | 行を維持したまま、周囲の統計値を出す | データを一定時間ごとに「集約」する |
| セットで使う句 | .over(window_spec) |
groupBy(F.window(...)) |
| 結果の行数 | 変わらない (元のデータと同じ行数) | 減る (時間枠ごとに1行にまとまる) |
| 指定方法 | partitionBy, orderBy, rows/rangeBetween |
windowColumn, windowDuration, slideDuration |
| 主な例 | 直近3レコードの移動平均、ランキング付 | 5分ごとのログイン数集計、1時間ごとの売上 |
時間あたりのグルーピングに関しては、F.window() の方が圧倒的に便利で強力です。
F.window()の具体的なイメージ
import pyspark.sql.functions as F
# 10分間隔でデータを集約する
df_aggregated = df.groupBy(
F.window(F.col("event_time"), "10 minutes")
).agg(
F.count("user_id").alias("login_count")
)
transform関数の利点:ゲームログのクレンジングのシナリオ
以下の3つの処理を行いたいとします。
- テストユーザーの除外(
filter_test_users) - スコアの正規化(
normalize_score) - 不正な移動速度のフラグ立て(
add_cheater_flag)
sparkのtransformがあると便利な例
1. .transform() を使わない場合(変数地獄)
中間変数がたくさん増えてしまい、コードが読みづらくなります。
# 関数定義
def filter_test_users(df):
return df.filter(df.user_id != "test_user")
def normalize_score(df):
return df.withColumn("score_norm", df.score / 100)
def add_cheater_flag(df):
return df.withColumn("is_cheater", df.speed > 500)
# 実行(変数だらけになる)
df1 = filter_test_users(raw_df)
df2 = normalize_score(df1)
final_df = add_cheater_flag(df2)
欠点:
df1,df2といった使い捨ての変数が並び、処理の順番を入れ替えたり、一部をコメントアウトしたりするのが面倒です。
2. .transform() を使う場合(スッキリ!)
メソッドチェーンで上から下に流れるように記述できます。
# 実行
final_df = (raw_df
.transform(filter_test_users)
.transform(normalize_score)
.transform(add_cheater_flag)
)
dlt.read_streamの便利さ
| 項目 | 標準 Spark (spark.readStream) | DLT (dlt.read_stream) |
|---|---|---|
| チェックポイント管理 | 手動。パスを指定して管理が必要。 | 自動。DLTが裏で勝手に管理。 |
| スキーマ推論/進化 | 定義が厳格。進化には設定が必要。 | 柔軟。自動的に新しい列を検知可能。 |
| 依存関係の解決 | 手動。実行順序を制御する必要がある。 | 自動。テーブル間の依存をグラフで解決。 |
| リトライ・回復 | 自前で再起動ロジックを書く必要あり。 | 自動。パイプラインが自動復旧。 |
| 可視化 | UI上でデータフローは見えない。 | リネージ(系譜)がGUIで見える。 |
- 具体的なコードと手間の違い
# 標準Sparkでの典型的な書き方
df = (spark.readStream
.format("cloudFiles") # Auto Loader
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoints/schema")
.load("/input/path"))
(df.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoints/data") # 毎回指定が必要
.trigger(availableNow=True)
.table("target_table"))
import dlt
@dlt.table
def target_table():
return dlt.read_stream("/input/path") # これだけで依存関係と管理が完了
3. dlt.read_stream が圧倒的に便利な3つの理由
① 「チェックポイント地獄」からの解放
標準Sparkでは、テーブルごとに checkpointLocation を管理しなければなりません。これを間違えるとデータが重複したり壊れたりしますが、DLTはパイプラインの設定として一括管理されるため、個別のコードに書く必要がありません。
② リネージ(データフロー)の自動生成
dlt.read_stream を使うと、Databricksが自動的にテーブル間の繋がりを解析し、美しいグラフを描いてくれます。
「どのテーブルが止まっているか」「どこでレコードが減ったか」がGUIで一目瞭然になるのは、開発中のゲームのデバッグで「どのフラグが原因でバグが起きたか」を一瞬で特定できる感覚に似ています。
③ データ品質管理(Expectations)との統合
前回の質問にあった expect などの制約は、dlt.read_stream とセットで使うことで真価を発揮します。ストリーミングで流れてくるデータに対して、「不正なデータは落とす」という処理を1行添えるだけで実装できるのはDLTならではの特権です。
Liquid Clusteringをテーブルに適用するコマンド
1. テーブルを新規作成する場合
CLUSTER BY 句を使用します。
2.SQL ver
CREATE TABLE my_catalog.my_schema.game_logs (
event_id STRING,
user_id STRING,
event_time TIMESTAMP,
event_type STRING
)
CLUSTER BY (user_id, event_type); -- ここでカラムを指定
PySpark (DataFrame API) ver
(df.write
.format("delta")
.clusterBy("user_id", "event_type") # クラスターカラムを指定
.saveAsTable("my_catalog.my_schema.game_logs"))
マテリアライズドビューを作成するコマンドは?
以下
import dlt
from pyspark.sql import functions as F
@dlt.table(
name="stage_stats_mv",
comment="ステージごとの集計データ(マテリアライズドビュー)"
)
def stage_stats_mv():
return (
dlt.read("raw_game_logs")
.groupBy("stage_id")
.agg(
F.count("user_id").alias("play_count"),
F.avg("clear_time").alias("avg_time")
)
)
複雑な財務処理をマテリアライズドビューを用いて、計算を効率化したい場合の具体的なコード例
import dlt
from pyspark.sql import functions as F
# 1. 通貨換算と税計算を行う中間MV(Silver層)
@dlt.table(
name="financial_transactions_cleaned",
comment="通貨換算と税計算済みの明細データ"
)
def transactions_cleaned():
return (
dlt.read("raw_sales")
.join(dlt.read("exchange_rates"), "currency_code")
.withColumn("amount_jpy", F.col("amount") * F.col("exchange_rate"))
.withColumn("tax_amount", F.col("amount_jpy") * 0.10) # 簡易的な10%税計算
.withColumn("net_amount", F.col("amount_jpy") + F.col("tax_amount"))
)
# 2. 経営ダッシュボード用の月次集計MV(Gold層)
# ここが「計算を効率化」する本番。複雑なGROUP BYを事前に済ませて実体化する。
@dlt.table(
name="monthly_financial_summary_mv",
comment="月次・部署別の財務サマリー(マテリアライズドビュー)",
table_properties={"quality": "gold"}
)
def monthly_summary():
return (
dlt.read("financial_transactions_cleaned")
.groupBy(
F.window(F.col("transaction_timestamp"), "1 month").alias("month"),
"department_id",
"region"
)
.agg(
F.sum("amount_jpy").alias("total_sales_jpy"),
F.sum("tax_amount").alias("total_tax_jpy"),
F.sum("net_amount").alias("total_net_profit"),
F.count("transaction_id").alias("transaction_count")
)
)
この構成のメリット(なぜ効率化されるのか)
- 再計算の回避
BIツール(TableauやDatabricks SQL)から参照する際、毎回「外貨換算」や「1ヶ月分の集約」を行う必要がありません。すでに計算済みのmonthly_financial_summary_mvを読み込むだけなので、応答速度がミリ秒単位になります。 - 依存関係の自動管理
dlt.read()を使っているため、元のraw_sales(生データ)やexchange_rates(為替レート)が更新されると、DLTパイプラインが自動的にこのMVを再計算(リフレッシュ)してくれます。 - 計算の共通化
税計算ロジックなどを複数の部署で共有する場合、1つのMVで計算しておけば、各部署のクエリでロジックが微妙にずれるミスを防げます。
Shallow Clone vs Deep Clone
| 特徴 | Shallow Clone (浅いコピー) | Deep Clone (深いコピー) |
|---|---|---|
| データ実体 | コピーしない(メタデータのみ) | 全データをコピーする |
| 依存性 | 元のデータに依存する | 完全に独立している |
| コスト/速度 | 非常に高速、ストレージ消費ほぼゼロ | 時間がかかる、ストレージ代が2倍 |
| 試験の罠 | 元のテーブルで VACUUM を実行すると、Shallow Cloneしたテーブルは参照先を失い壊れる可能性がある。 |
ソーステーブルを削除しても、Deep Clone側には影響がない。 |
| 推奨シーン | 短時間のテスト、開発環境での検証 | 本番データのバックアップ、リージョンを跨いだ移行 |
Job Parameters と Task Valuesの違いとは
ジョブ(ワークフロー)内での「値の受け渡し」についての深い理解が問われます。
- Job / Task Parameters:
- ジョブ全体の開始時に渡す「定数」のようなもの。
dbutils.widgets.get()などで取得し、ジョブ内の全タスクで共通して利用可能。
- Task Values:
- 「タスクAの結果をタスクBで使いたい」 という動的な受け渡しに使用。
dbutils.jobs.taskValues.set(key="my_key", value=123)で保存し、後続タスクでgetする。- 試験ポイント: 「タスク間で動的なステータスや計算結果を共有する最適な方法は?」と聞かれたらこれ。
Job / Task Parameters例
- Parameter Key: process_date
- Parameter Value: 2026-03-12
# パラメータの取得
process_date = dbutils.widgets.get("process_date")
# 取得した日付を使ってデータをフィルタリング
df = spark.table("raw_sales").filter(f"order_date = '{process_date}'")
Adaptive Query Execution
Spark 3.0からの目玉機能。実行中に「統計情報」を見て、クエリプランを動的に書き換える機能です。
試験で問われる「AQEができる3つの魔法」:
- Coalescing Post-shuffle Partitions: シャッフル後に細かくなりすぎたパーティションを自動で結合し、ファイル数過多を防ぐ。
- Switching Join Strategies: 実行中に「あれ、意外とこっちのテーブル小さいな」と判断したら、Sort Merge Joinから Broadcast Hash Join に勝手に切り替える。
- Optimizing Skew Joins: データの偏り(スキュー)を検知して、重いパーティションを細かく分割して処理を均一化する。
is_member() 関数とは
- 実装例
CREATE FUNCTION sales_row_filter(region STRING)
RETURN
is_account_group_member('admin_group') OR region = 'Japan';
- 試験ポイント: 「特定のマネージャーには全データを、一般社員には自分の地域のデータだけを見せたい」というシナリオで、
MASKではなくROW FILTERと組み合わせて使う。
| 種類 | 該当する単語 |
|---|---|
| 予約語 / キーワード | CREATE, FUNCTION, RETURN, OR |
| 識別子(ユーザー定義) | sales_row_filter (関数名), region (引数名) |
| 組み込み関数 | is_account_group_member |
Sort Mergeとは
大規模テーブル同士を結合する際の「重厚な」デフォルトの結合手法です。
- ステップ:
- Shuffle: 結合キーに基づいてデータを全ノードに再配置。
- Sort: 各ノード内でデータをキー順に並べ替える。
- Merge: 並んだもの同士を上からガッチャンコして結合。
- 試験ポイント: * Broadcast Hash Join が使えない(両方デカすぎる)時の最終手段。
- Shuffleコストが高い ため、可能な限り
Z-OrderやLiquid Clusteringでデータを整理しておくことで、このスキャンやソートを効率化できる。 spark.sql.shuffle.partitionsの設定値が、このJoinのパフォーマンスに直結する。
- Shuffleコストが高い ため、可能な限り
APPLY CHANGESの具体例
【前】ソースデータ(Streaming Source / CDC Feed)
変更が順番に届くログ形式です。
| user_id | name | address | __sequence_num | __is_delete |
|---|---|---|---|---|
| 101 | 田中 | 東京 | 1 | false |
| 101 | 田中 | 大阪 | 2 | false |
| 102 | 佐藤 | 名古屋 | 3 | false |
| 101 | 田中 | 大阪 | 4 | true |
【後】ターゲットテーブル(APPLY CHANGES 適用後)
APPLY CHANGES は重複を排除し、最新の順序(sequence_num)に基づいてマージします。
| user_id | name | address |
|---|---|---|
| 102 | 佐藤 | 名古屋 |
タグを使用するケースとは
1. セキュリティとコンプライアンス(PII管理)
最も一般的かつ重要な用途です。個人情報(PII)や機密情報が含まれるカラムにタグを付けます。
- タグの例:
Sensitivity: High,PII: Email,GDPR: Relevant - メリット: どのテーブルに個人情報があるか一目で分かり、監査レポートの作成が容易になります。将来的に、特定のタグが付いたカラムを一括で非表示にするような「属性ベースのアクセス制御(ABAC)」の基盤になります。
2. データカタログの検索性向上(データ発見)
数千のテーブルがある環境では、名前だけでは中身が判断できません。ユーザーがデータを探しやすくするためにタグを使います。
- タグの例:
Project: Marketing_Alpha,Status: Gold,Domain: Sales - メリット: Catalog Explorer の検索窓に「Marketing」と打ち込むだけで、関連するテーブルを即座に絞り込めます。
appsセクションとvolumeセクションをyamlで定義する場合のg設定例を教えて。※特にVolumesが指定するappsの項目はapps自体のna定義を指定するか、apps配下のnameの値使うのか、定義自体の.idをとるのか?確認したい
resources:
volumes:
# (A) ここが Volume の定義(リソースキー)
my_data_volume:
catalog: main
schema: default
name: "actual_volume_name_in_uc"
apps:
my_shiny_app:
name: "prod-app"
# (B) ここで Volume を指定する
volumes:
- source_volume: my_data_volume # ← (A)の「定義キー」を指定する
path: /mnt/data # App内でのマウント先
pandasの方がpyspackより選ばれるケースは?シーケンシャルな処理をカットして計算処理を効率化できる?とか?
| ケース | 理由 |
|---|---|
| 小規模なデータセット | 数GB程度(ドライバのメモリに収まる)なら、Sparkのタスク分割や通信のオーバーヘッドがないPandasの方が圧倒的に速い。 |
| 複雑な時系列解析 | 前の行の結果を次の行で使うような、本質的に並列化できない(Sequentialな)処理。 |
| 豊富なライブラリ連携 | Scikit-learn, SciPy, Statsmodelsなど、Pandasを前提とした高度な統計・機械学習ライブラリを使いたい場合。 |
| EDA(探索的分析) | 試行錯誤の段階では、Lazy Evaluation(遅延評価)のSparkよりも、即座に結果が出るPandasの方が開発効率が良い。 |
PARTATIONBYと、GROUPBYの具体的な違いをわかりやすく
元データ(Source Table)
| 注文ID (ID) | 部署 (Dept) | 売上 (Sales) |
|---|---|---|
| 1 | 営業 | 500 |
| 2 | 営業 | 300 |
| 3 | 技術 | 1000 |
パターン1:GROUP BY(集計)
- 「部署ごとの合計売上を知りたい」という場合です。
SELECT Dept, SUM(Sales) as Total FROM orders GROUP BY Dept
結果
| 部署 (Dept) | 合計 (Total) |
|---|---|
| 営業 | 800 (500 + 300) |
| 技術 | 1000 |
パターン2:PARTITION BY(Window関数)
- 「各注文の横に、その部署の合計売上を並べたい」という場合です。
SELECT ID, Dept, Sales,
SUM(Sales) OVER (PARTITION BY Dept) as Dept_Total
FROM orders
結果
| 注文ID (ID) | 部署 (Dept) | 売上 (Sales) | 部署合計 (Dept_Total) |
|---|---|---|---|
| 1 | 営業 | 500 | 800 |
| 2 | 営業 | 300 | 800 |
| 3 | 技術 | 1000 | 1000 |
比較表
| 比較項目 | PARTITION (repartition) | GROUP BY |
|---|---|---|
| 行数 (Row Count) | 変わらない | 減る(集計されるため) |
| 主な目的 | パフォーマンス向上、ファイル管理 | データ分析、要約、重複排除 |
| シャッフル | 必ず発生する(全データの移動) | 必ず発生する(キーごとの移動) |
| 実行タイミング | write の直前、join の前後 |
変換処理の途中、最終集計時 |
CASTの使い方
-- 文字列 '20260325' を日付型にしたい場合
SELECT to_date('20260325', 'yyyyMMdd');`
外部テーブルのDFを定義した後にfilterしたらFile Skippingされなかった。なんで?
1. 統計情報(Statistics)が収集されていない
File Skipping は、各ファイルの「最小値(min)」と「最大値(max)」をメタデータとして持っているからこそ、「このファイルには探している値はないな」と判断してスキップできます。
- 原因: 外部テーブル(特に非Delta形式や、Databricks以外で作成されたテーブル)の場合、Sparkが各ファイルの統計情報を正確に把握できていないことがあります。
- 対策:
ANALYZE TABLEコマンドを実行して、明示的に統計情報を収集してください。SQLANALYZE TABLE table_name COMPUTE STATISTICS FOR ALL COLUMNS;
2. パーティション列以外でフィルターしている
これは最も初歩的かつ強力な理由です。
- 原因: テーブルが
dateでパーティション化されているのに、user_idでフィルターをかけても、Sparkはどのフォルダ(パーティション)にどのuser_idがあるか分かりません。 - 結果: すべてのパーティションをスキャンし、その中の全ファイルを開いて中身を確認せざるを得なくなります。
- 対策: フィルター条件にパーティション列を含めるか、Delta Lake の Z-Order を使用して非パーティション列でもスキップを効かせるようにします。
3. ファイル形式が「スキップ」に対応していない
- 原因: ソースが CSV や JSON の場合、ファイル自体に「このカラムの最小値はこれ」といったメタデータを持っていません。そのため、Sparkはファイルを最後まで読み切らないと、データがあるかどうか判断できません。
- 対策: Parquet または Delta 形式を使用してください。これらは列指向(Columnar)フォーマットであり、ファイルフッターに統計情報を持っているため、File Skipping が強力に機能します。
コンピュートポリシーってカスタム環境変数とか作れる?あと、ライブラリをクラスターで使いまわしたい時はどうすべき?
以下のように設定可能
{
"spark_env_vars.ENV_STAGE": {
"type": "fixed",
"value": "PROD"
},
"spark_env_vars.API_KEY": {
"type": "forbidden"
}
}
データフレームのキャッシュって考えないといけない場面ある?
| 特徴 | Sparkキャッシュ (RAM) | Deltaキャッシュ (SSD/Disk) |
|---|---|---|
| コマンド | df.cache() |
自動、または CACHE SELECT |
| 保存先 | クラスターのメモリ(RAM) | ワーカーノードのローカルSSD |
| 形式 | デシリアライズされたSpark形式 | 圧縮されたParquet/Delta形式 |
| 使いどころ | 同じDFを何度も使い回す時 | 同じテーブルを何度もクエリする時 |
一部のjsonで形式は正しいのに失敗データとして扱われてた。これってどう言う理由でおこる?例えばjsonが途中で複数jsonで送られたからとか、rescueの設定がおかしかったから?
1. 最も多い理由:Multiline(複数行)設定の不一致
JSONデータが「1つのファイルに1つのオブジェクト」なのか「1行に1つのオブジェクト(JSONL)」なのかの設定ミスです。
- 現象: JSON自体は正しいが、1つのファイル内に複数のJSONオブジェクトが含まれている。
- 原因: * デフォルト(
multiline=false)では、Sparkは 「1行 = 1レコード」 として読み取ります。- ファイルが
[ {...}, {...} ]のように配列形式だったり、改行を含むきれいなJSON(Pretty Print)だったりすると、2行目以降が解析できず失敗扱いになります。
- ファイルが
- 対策: 読み込みオプションに
.option("multiline", "true")を追加する。
ALL PRIVILEGES で「できること」
付与されたオブジェクトのタイプ(カタログ、スキーマ、テーブルなど)に応じて、以下のすべての標準的な操作が可能になります。
- テーブルの場合:
SELECT(読み取り),MODIFY(書き込み/更新),METADATA(定義参照) など。 - カタログ/スキーマの場合:
USE CATALOG,USE SCHEMA,CREATE TABLE,CREATE VOLUMEなど。
Databricks bundle deployとは、使用例
開発者の動き(CLI操作)
- コードを書く:
databricks.ymlにジョブやパイプラインの定義を書く。 - 検証する:
databricks bundle validateで「書き間違いがないか」をチェック。 - デプロイする:
databricks bundle deployを実行。 - 動かす:
databricks bundle run(またはUIから実行)。
| コマンド | 用途 | 自動化に向いているか |
|---|---|---|
databricks bundle dev |
ローカル開発中のリアルタイム同期。ファイルを保存するたびに即座に反映されます。 | 不向き(対話型処理のため) |
databricks bundle deploy |
一括反映。特定のバージョンを「正解」としてデプロイします。 | 最適(CI/CDに組み込む標準) |
DLT(Delta Live Tables)のexpect機能を使った場合、正常、異常のログって具体的にどう言うデータが残る?
| 項目名 | 説明 |
|---|---|
name |
expect で定義した制約の名前(例: valid_id) |
passed_records |
制約をパスした正常なレコード数 |
failed_records |
制約に違反した異常なレコード数 |
dropped_records |
expect_or_drop で破棄されたレコード数 |
quarantined_records |
(カスタム実装時)隔離用テーブルに送られた数 |
CDCに関して、削除レコード重複してても大丈夫する仕組みってどんなもの?
Databricks(特に Delta Live Tables や Delta Lake)において、CDC(Change Data Capture)データに削除レコードの重複が含まれていても正しく処理できる仕組みについて解説します。
結論から言うと、主な仕組みは 「SEQUENCE BY による順序制御」 と 「決定論的な最新レコードの選択」 です。
1. SEQUENCE BY カラムによる制御
DLTの APPLY CHANGES INTO では、必ず SEQUENCE BY という引数を指定します。ここには通常、ソースシステムでの「イベント発生時刻」や「ログ・シーケンス番号(LSN)」を指定します。
- 仕組み: 同じキー(例:
user_id=101)を持つ複数の変更レコードが届いた場合、SparkはSEQUENCE BYで指定された値が最も大きい(最新の)ものを特定します。 - 重複への対応: もし「削除(DELETE)」というステータスのレコードが全く同じ内容で2つ重複して届いても、それらは同じシーケンス値(または前後する値)を持ちます。システムは最終的に「このキーは削除されるべき」という最新の状態だけをターゲットテーブルに反映するため、2回削除命令が来ても結果は同じ(冪等性が保たれる)になります。
権限管理とカタログ (Unity Catalog)
USE CATALOG の限界
USE CATALOG 権限は、メタデータへのアクセスを許可するだけです。
- できること: カタログ内のスキーマ一覧を表示する (
SHOW SCHEMAS)。 - できないこと: スキーマ内のテーブルを読み書きする、スキーマを作成する。
- 試験の罠: テーブルを SELECT するには、
USE CATALOG+USE SCHEMA+SELECTの 3段階すべて が必要です。
2. 実装:CDC (APPLY CHANGES INTO)
SCD(ゆっくり変化するディメンション)の実装例です。
SCD Type 1(上書き)
APPLY CHANGES INTO live.target_table FROM stream(live.source_stg) KEYS (user_id) SEQUENCE BY logical_timestamp -- 順序制御 COLUMNS * EXCEPT (operation) STORED AS SCD TYPE 1;
SCD Type 2(履歴保持)
試験では 「履歴カラムがどう生成されるか」 が問われます。
APPLY CHANGES INTO live.target_table FROM stream(live.source_stg) KEYS (user_id) SEQUENCE BY logical_timestamp STORED AS SCD TYPE 2; -- __start_at, __end_at などのフラグが自動生成される
- 重複排除の仕組み:
SEQUENCE BYカラムの値が同じレコードが複数来た場合、DLT は非決定的な選択を避けるため、内部的な決定論的アルゴリズムで1つを選びます(通常は最後に届いたもの)。
4. 運用・トラブルシューティング
Secret 管理 (CLI & dbutils)
- Scope作成:
databricks secrets create-scope my-scope - 保存:
databricks secrets put-secret my-scope my-token(対話モード) - Pythonで取得:Python
token = dbutils.secrets.get(scope="my-scope", key="my-token")
💡 print(token) しても "[REDACTED]" と表示され、漏洩を防ぐ仕様です
5. 用語クイック確認表
| 機能 | 特徴・使いどころ |
|---|---|
| Bucketing | 特定のカラムでデータを物理的に分割。JOIN キーで Bucket しておくと Shuffle を回避 できる。 |
| Query Profile | クエリ実行計画の可視化。「Stage」単位で表示され、どこで時間がかかったか(Spill to disk など)を確認。 |
| Append Only | TBLPROPERTIES ('delta.appendOnly' = 'true')。削除・更新を物理的にロックする。 |
| MaxBytesPerTrigger | Streaming で「1バッチあたりのデータ量」を制限し、メモリ不足を防ぐ。 |
| System Tables | system.information_schema で権限、system.billing でコストを確認。 |
GitHub Actions のトリガータイミング
基本は push や pull_request です。
init: プロジェクト初期化validate: Bundle の設定ミスがないかチェック(デプロイ前)deploy: 開発/ステージング環境へリソースを作成run: デプロイされたジョブを実行してインテグレーションテスト
Query Profile(クエリプロファイル)とは
Query Profile(クエリプロファイル)は、Databricks SQLやノートブックで実行されたクエリが、物理的にどう実行されたかを詳細に分析するためのツールです。ボトルネックを探す際の「健康診断書」のようなものだと考えてください。
試験対策として、以下の主要な取得項目と概念を押さえておきましょう。
1. 実行の構成単位:タスクとステージ
Query Profileを開くと、クエリが複数の Task(タスク) や Stage(ステージ) に分割されているのが見えます。
- Stage: データのシャッフル(再配置)が発生するタイミングで区切られる大きな実行単位です。
- Task: 各エグゼキューターで並列実行される最小の単位。特定のタスクだけ異常に遅い場合、「データスキュー(偏り)」が疑われます。
2. 各演算子(Operator)ごとの詳細メトリクス
クエリプランの各ノード(Scan, Filter, Join, Aggregateなど)をクリックすると、以下の項目が確認できます。
データの読み込み関連
- Files read / Files skipped: *
Files skippedが多いほど、データスキッピング(Z-Orderやパーティション)が上手くいっています。 - Size of data read: ストレージから読み込まれた物理的なデータ量。
- Metadata interaction time: カタログやファイルシステムとのやり取りにかかった時間。
演算(計算)関連
- Number of output rows: その工程から次の工程へ渡された行数。
- Wall-clock time: その演算子が実際に動作していた合計時間。
- CPU time: 実際にCPUを消費した時間(並列処理されているため、Wall-clockより長くなることがあります)。
メモリとディスク関連(非常に重要)
- Peak memory usage: 各演算子が使用した最大メモリ量。
- Spill to disk: 最重要チェック項目です。メモリに収まりきらなかったデータがディスクに書き出された量。これが記録されていると、パフォーマンスが劇的に低下します。
is_group_memberみたいな関数ってある?どう使う?
is_account_group_member() という関数がそれにあたります。
また、ワークスペース単位のグループ判定には is_member() を使います。
これらは主に 「動的ビュー(Dynamic Views)」 や 「行レベルフィルタ / 列レベルマスク」 で、ログインしているユーザーの権限に応じてデータの見え方を変えるために使用されます。
1. 主要な関数とその違い関数名確認の範囲主な用途
| 関数名 | 確認の範囲 | 主な用途 |
|---|---|---|
is_account_group_member('group_name') |
アカウント全体のグループ | Unity Catalog環境での標準的な権限チェック。 |
is_member('group_name') |
ワークスペース内のグループ | 特定のワークスペースに閉じた権限チェック。 |
current_user() |
現在のユーザー名 | 実行者のメールアドレスを取得。 |
削除ベクター機能って何が便利?
1. 従来(削除ベクターなし)の動き:Copy-on-Write
従来のDelta Lakeでは、1行でもデータを削除または更新(UPDATE)する場合、その行が含まれるファイル全体(通常128MB〜1GB)を読み込み、該当行を除いて新しいファイルとして書き出す必要がありました。
- 問題点: たった数行の削除のために、巨大なファイルのIOが発生し、時間がかかる。書き込みの競合(Conflict)も起きやすい。
2. 削除ベクターありの動き
削除ベクターを有効にすると、元のデータファイルには一切手をつけません。代わりに、「どの行が削除されたか」を記録した小さなビットマップファイル(削除ベクター)を別に作成します。
- 削除時: 元ファイルはそのまま。小さな「削除リスト」をピッと作るだけ(超高速)。
- 読み込み時: 元ファイルを読み込む際、削除ベクターを参照して「削除済み」とマークされた行をメモリ上でスキップして結果を返します。
3. 何が便利なの?(メリット)
- 削除・更新の劇的な高速化:
数GBあるテーブルから1行消す処理が、数分から数秒に短縮されます。特にGDPRの「忘れられる権利」対応や、CDC(変更データキャプチャ)の頻繁な更新に極めて有効です。 - 書き込み競合(Conflict)の低減:
ファイルを物理的に作り直さないため、複数の書き込み処理が同じファイルをターゲットにしていても衝突しにくくなります。 - ストレージコストの抑制:
一時的に「古いファイル + 新しいファイル」が共存する期間がなくなる(または減る)ため、ストレージのスパイクを抑えられます。
Streamテーブルの具体的な使用例
1. 【Bronze】S3のJSONをAuto Loaderで取り込む
このフェーズでは、cloud_files という関数を使い、S3に新しく置かれたJSONファイルを検知してストリーミングテーブルに書き込みます。
SQLでの記述例
CREATE OR REFRESH STREAMING TABLE bronze_orders
AS SELECT * FROM cloud_files(
"s3://my-bucket/raw/orders/", -- ソースのパス
"json", -- ファイル形式
map("cloudFiles.inferColumnTypes", "true") -- スキーマ推論などのオプション
)
試験のポイント:
STREAMINGキーワード: これを忘れると、ただの1回限りのコピー(Batch)になってしまいます。cloud_files: これがAuto Loaderを呼び出す呪文です。ファイルが増えるたびに、新着分だけを取り込みます。
2. 【Silver】届いた分だけを既存テーブルに「ガッチャンコ」
Bronzeテーブルに入ってきた「新しいレコードだけ」を加工して、Silverテーブルに流し込む設定です。
SQLでの記述例
CREATE OR REFRESH STREAMING TABLE silver_orders AS SELECT order_id, customer_id, cast(order_timestamp as timestamp) as order_time, -- 型変換などの加工 upper(status) as status FROM STREAM(live.bronze_orders) -- 「STREAM()」が重要!
試験のポイント:なぜ「ガッチャンコ」になるのか?
STREAM(live.table_name): これがキモです。これを付けることで、Databricksは内部的に「checkpoint(どこまで読んだかの記録)」を参照します。- 増分処理: 既に
silver_ordersに入っているデータは無視され、bronze_ordersに新しく入ってきた行(まだ処理していない行)だけが抽出され、silver_ordersの末尾に追加されます。
3. 実例:レコードが流れる様子
- S3に
file_A.jsonが置かれるbronze_ordersに3行追加される。silver_ordersがbronzeの新規3行を検知し、加工して自分の末尾に追加する。
- S3に
file_B.jsonが置かれるbronze_ordersはfile_Bの分だけを追加。(file_Aは無視)silver_ordersもfile_B由来の新規データだけを追加。- 結果として、テーブルはどんどん下に伸びていく(Append-only)。