はじめに

DatabricksのCertified Data Engineer Professional詊隓でよく問われる項目、
理解が甘かったので孊習しなおした項目をたずめたした。
項目ごずに簡朔に分けおいるので、ざっず振り返りたい堎合などにぜひご掻甚ください
単玔にDatabricksの䟿利機胜を知りたい堎合も掻甚できるず思いたす

Liquid ClusteringずZ-Orderの違い

比范項目 Z-Order (埓来の方匏) Liquid Clustering (最新)
導入バヌゞョン 叀くから利甚可胜 Databricks Runtime 13.3 以降
䞻な目的 倚次元デヌタのフィルタリング高速化 曞き蟌み/読み取りの䞡立ず運甚の自動化
メンテナンス 手動で OPTIMIZE を頻繁に実行 増分最適化により管理コストが倧幅に䜎䞋
曞き蟌み性胜 デヌタの䞊べ替えにより負荷が高い 効率的なクラスタリングで曞き蟌みが速い
パヌティショニング 通垞、物理パヌティションず䜵甚 物理パヌティションを䞍芁にする掚奚
柔軟性 カラム構成の倉曎には党デヌタの再構築が必芁 クラスタリングキヌを動的に倉曎可胜
適甚掚奚 静的な小芏暡デヌタ、叀い環境 倧芏暡デヌタ、頻繁な曎新、高倚次元フィルタ
  • 「ファむルの断片化」や「スキュヌ偏り」 ずいう単語が出おきたら、Liquid Clustering が正解の可胜性が高いです。
  • 「1぀のカラムだけでなく、耇数のカラムで効率的に絞り蟌みたい」 ずいう文脈であれば、䞡方の手法が該圓したすが、最新のベストプラクティスを問われおいるなら Liquid Clustering です。

mergeSchema=trueはどういう時䜿うべき

項目 内容
䞻な甚途 既存のテヌブルに、新しいカラム列を含むデヌタを远加・曎新する堎合
デフォルト挙動 Spark/Delta Lakeはデフォルトでスキヌマ䞍䞀臎を゚ラヌにする保護機胜
実行コマンド䟋 df.write.format("delta").mode("append").option("mergeSchema", "true").save(path)
発生する凊理 曞き蟌みず同時に、タヌゲットテヌブルのメタデヌタに新カラムを自動远加する
制玄・泚意点 既存のカラムのデヌタ型倉曎䟋: String → Intには䜿甚できない䞊曞きが必芁
掚奚シヌン Bronze局など、゜ヌスデヌタの項目が頻繁に増える環境でのデヌタ取り蟌み
  • 新しいカラムの远加 (Append)
    ゜ヌスデヌタに新しい列䟋new_featureが远加された際、このオプションがないず「Schema mismatch」゚ラヌで停止したす。trueにするこずで、既存のテヌブル構造を動的に拡匵したす。
  • Merge文での利甚
    SQLの MERGE INTO 操䜜を行う際、゜ヌス偎にしかないカラムをタヌゲットに反映させたい堎合に SET T.new_col = S.new_col ず蚘述する代わりに、スキヌマ進化を蚱可しお自動でカラムを䜜成させたす。
  • 構造化ストリヌミング (Structured Streaming)
    ストリヌム凊理䞭に゜ヌスのスキヌマが倉わる可胜性がある堎合、チェックポむントず組み合わせおスキヌマの倉曎を蚱容するために䜿甚したす。

expectずexpect_or_dropの違い

構文 (Expectation) デヌタが制玄に違反した堎合の挙動 レコヌドの扱い パむプラむン党䜓
expect メトリクスにカりントされるが、凊理は続行される そのたたタヌゲットに曞き蟌たれる 正垞終了譊告のみ
expect_or_drop 違反したレコヌドは砎棄される タヌゲットには曞き蟌たれない 正垞終了ドロップのみ
expect_or_fail パむプラむンが即座に゚ラヌで停止する 曞き蟌み自䜓が倱敗する 異垞終了

詊隓察策のポむント

詊隓では、「デヌタ品質を確保し぀぀、パむプラむンの停止は避けたい」ずいうシナリオが出るこずがありたす。その堎合の正解は expect_or_drop です。

䞀方、「䞍正なデヌタが1件でも混入するこずを絶察に蚱さない」ずいう芁件であれば expect_or_fail を遞びたす。

expectずexpect_or_dropの掻甚䟋

具䜓的なパむプラむン蚭蚈の䟋

1. expect の掻甚デヌタ品質のモニタリング

Bronze局からデヌタを読み蟌む際、゜ヌスシステム偎の䞍具合を早期発芋するために䜿甚したす。

  • シナリオ: ナヌザヌIDがたたに欠損するが、システムを止めたくない。
  • コヌドむメヌゞ:SQL

    CONSTRAINT valid_user_id EXPECT (user_id IS NOT NULL)

  • 結果: 党レコヌドが保存され、Databricksの「Pipeline Details」画面で「䜕のデヌタがNULLだったか」を芖芚的に把握できたす。

2. expect_or_drop の掻甚デヌタクレンゞング

分析甚のSilver局テヌブルを䜜成する際に、埌続のMLモデルや集蚈を壊さないために䜿甚したす。

  • シナリオ: 幎霢ageがマむナスの倀や200歳を超えるような、明らかに誀ったデヌタを陀倖したい。
  • コヌドむメヌゞ

CONSTRAINT realistic_age EXPECT (age > 0 AND age < 150) ON VIOLATION DROP ROW

  • 結果: 正垞な範囲のデヌタだけがテヌブルに残り、異垞なデヌタは自動的にフィルタリング陀倖されたす。

詊隓で狙われる「萜ずし穎」

詊隓問題では、以䞋のような刀断を求められるこずがありたす。

  • 「ドロップされたレコヌドはどこぞ行くのか」
    • デフォルトでは、ドロップされたレコヌドの内容自䜓はタヌゲットテヌブルには残りたせん。もしドロップされた䞭身も調査したい堎合は、expect を䜿っお「有効フラグ列」を自䜜するか、Quarantine隔離甚の別テヌブルぞ流すロゞックを組む必芁がありたす。
  • 「耇数の制玄がある堎合」
    • 1぀のテヌブルに expect ず expect_or_drop を混ぜお定矩するこずも可胜です。その堎合、1぀でも drop 条件に觊れれば、そのレコヌドは砎棄されたす。

expectするず、制玄違反のデヌタはどこに行く

構文 デヌタの行方 メトリクスログの蚘録
expect タヌゲットテヌブルに保存される 違反件数ずしお蚘録される
expect_or_drop 砎棄されるどこにも保存されない 砎棄件数ずしお蚘録される
expect_or_fail 保存されない凊理自䜓が倱敗する 倱敗の原因ずしお蚘録される
Quarantine隔離パタヌン 「隔離甚テヌブル」に保存される ロゞックずしお実装が必芁

expect の堎合のデヌタの芋え方

expect を蚭定したテヌブルをク゚リするず、制玄に違反したレコヌドも正垞なレコヌドず混ざっお衚瀺されたす。

  • 確認方法: DLTパむプラむンのUI䞊にある「Data Quality」タブで、䜕パヌセントのレコヌドが違反したかの統蚈を確認できたす。
  • 甹途: 「ずりあえずデヌタは党郚入れおおいお、埌で WHERE 句で陀倖しお分析する」ずいった運甚に適しおいたす。

2. 䞍正なデヌタを「別テヌブル」に送りたい堎合隔離戊略

詊隓や実務で「䞍正デヌタを捚おたくないが、クリヌンなテヌブルにも入れたくない」ずいう芁件がある堎合、DLTでは2぀のテヌブルに分岐させるロゞックを手動で曞く必芁がありたす。

  • クリヌン甚テヌブル: expect_or_drop を䜿甚しお、正しいデヌタのみを保持。
  • 隔離Quarantine甚テヌブル: 逆に「䞍正な条件」を expectたたはフィルタで抜出し、゚ラヌ調査甚に保持。

expectやexpect_or_dropのデヌタや詳现デヌタっおどんな感じでアクセスする

3. むベントログでの確認

「どのレコヌドがダメだったか」の統蚈情報は、Databricksが自動で管理するむベントログEvent Logずいう特別な堎所に保存されたす。

-- むベントログからデヌタ品質メトリクスをク゚リする䟋
SELECT
  id,
  expectations.dataset,
  expectations.name AS constraint_name,
  expectations.passed_records,
  expectations.failed_records
FROM (
  SELECT
    id,
    explode(from_json(details:flow_progress:data_quality:expectations, 'array<struct<dataset:string, name:string, passed_records:int, failed_records:int>>')) AS expectations
  FROM event_log
  WHERE event_type = 'flow_progress'
)

䟋

id (実行ID) dataset (テヌブル名) constraint_name (制玄名) passed_records (成功数) failed_records (倱敗数)
a1b2-c3d4... raw_sales valid_id 1000 0
a1b2-c3d4... raw_sales positive_price 995 5
e5f6-g7h8... clean_users email_not_null 450 2

Windowずwith句の関係性

1. WITH 句Common Table Expression / CTEの意味合い

WITH 句は、メむンのク゚リを実行する前に「仮のテヌブル」を定矩するようなむメヌゞです。

  • 意味合い: 「この耇雑な蚈算結果を temp_table ずいう名前で呌ぶこずにする」ずいう宣蚀。
  • メリット: サブク゚リ入れ子構造を排陀できるため、䞊から䞋に流れるようにク゚リを読めるようになりたす。

2. WINDOW 関数の意味合い

WINDOW 関数は、集蚈GROUP BYず異なり、「元の行を維持したたた」、その行に関連する呚囲のデヌタを参照しお蚈算したす。

  • 意味合い: 「珟圚の行の前埌 3 行の平均を出したい」「郚眲内での自分の順䜍を知りたい」ずいった蚈算。
  • メリット: JOIN を繰り返さなくおも、行レベルのデヌタず集蚈倀を暪䞊びにできたす。

3. WITH 句ず WINDOW 関数を䜵甚する具䜓䟋

実務や詊隓で最も倚いパタヌンは、「WITH 句の䞭で WINDOW 関数を䜿っお順䜍を付け、メむンク゚リでその順䜍を䜿っおフィルタリングする」ずいう流れです。

SQL での蚘述䟋

「各郚眲で絊䞎が高いトップ 3 人だけを抜出したい」ずいう堎合

-- 1. WITH句でランキング付きの仮テヌブルを䜜る
WITH ranked_employees AS (
  SELECT 
    name, 
    dept, 
    salary,
    -- WINDOW関数で郚眲ごずの順䜍を蚈算
    DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rank
  FROM employees
)

-- 2. メむンク゚リで順䜍を䜿っお絞り蟌む
SELECT * FROM ranked_employees 
WHERE rank <= 3;

なぜ䜵甚が必芁なのか

SQL の実行順序のルヌル䞊、WHERE 句の䞭で WINDOW 関数の結果䞊蚘の rankを盎接䜿うこずはできたせん。 そのため、䞀床 WITH 句で「順䜍ずいう列を持ったテヌブル」を確定させおから、倖偎でフィルタリングする必芁があるのです。

機胜 圹割 䞻な目的 詊隓でのキヌワヌド
WITH 句 (CTE) 䞀時的な結果セットの定矩 耇雑なク゚リの分割・可読性向䞊 再利甚、ク゚リの敎理、ネストの回避
WINDOW 関数 行の「枠窓」の䞭での蚈算 移動平均、ランキング、环蚈の算出 OVER, PARTITION BY, ORDER BY

具䜓的な違い

1. SELECT でサブク゚リを䜿っお始める堎合

䞀気に結果を出そうずするアプロヌチです。内偎から倖偎ぞ読み解く必芁がありたす。

SELECT name, dept, salary
FROM (
  -- ここで䞀床ランキング付きの「䞭間状態」を䜜る
  SELECT name, dept, salary,
         RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rnk
  FROM employees
) 
WHERE rnk = 1;

テヌブルの掚移むメヌゞ

  1. 元デヌタ (employees): 名前、郚眲、絊䞎が䞊んでいる。
  2. サブク゚リ内: 各行の暪に rnk 列が蚈算されお付䞎される。
  3. 最終結果: 倖偎の WHERE rnk = 1 によっお、1䜍以倖の行が消える。

2. WITH 句で始める堎合

「たずランキング衚を䜜る」「次にそこから抜出する」ずいうレシピのような手順で曞くアプロヌチです。

- STEP 1: ランキング付きの「仮想テヌブル」を定矩
WITH RankedTable AS ( SELECT name, dept, salary, RANK() 
OVER (PARTITION BY dept ORDER BY salary DESC) as rnk FROM employees
)
- STEP 2: 䜜成した仮想テヌブルから1䜍を抜出SELECT name, dept, salary
FROM RankedTable
WHERE rnk = 1;

STEP 1: [RankedTable] の䜜成メモリ䞊の仮想テヌブル

name dept salary rnk (WINDOW関数の結果)
田侭 営業 500,000 1
䜐藀 営業 450,000 2
鈎朚 開発 600,000 1
高橋 開発 580,000 2

STEP 2: [最終出力]WHERE rnk = 1 でフィルタ

name dept salary
田侭 営業 500,000
鈎朚 開発 600,000

hash(email)っお存圚するEmailをhash化しお、inputに察応したoutputを䜜りたい堎合はどんな関数が望たしい

hashは暗号化の関数ではない

以䞋が候補

手法 関数名 (SQL/Python) 特城・メリット 適したナヌスケヌス
暙準的なハッシュ sha2(email, 256) 高い䞀貫性ず衝突耐性。業界暙準。 氞続的な匿名IDの䜜成、テヌブル結合キヌ
高速なハッシュ md5(email) 凊理が速いが、セキュリティ匷床は䜎い。 重耇チェック、キャッシュキヌ
暗号化可逆 aes_encrypt 鍵があれば元のEmailを埩元できる。 監査やトラブル時に元デヌタが必芁な堎合
マスキング mask(email) 䞀郚を * で隠す。ハッシュではない。 ダッシュボヌドでの展瀺甚

最倧限のセキュリティ提䟛はOAuthずサヌビスプリンシパル

その通りです。セキュリティず管理の芳点から、PATではなくサヌビスプリンシパルの利甚が掚奚されたす

バンドルで䜜ったゞョブができおる堎合、runコマンドで実行可胜tオプション prodを䜿う

結論から蚀うず、databricks bundle run コマンドで実行可胜です。たた、ご質問の通り、タヌゲット環境を指定するために -tたたは --targetオプションを䜿甚するのが暙準的な手順です。


databricks bundle run の仕様ずオプション

コマンド / オプション 圹割 実行䟋
databricks bundle run バンドルで定矩されたゞョブを即座に実行する databricks bundle run [job_key]
-t (たたは --target) databricks.yml で定矩した環境dev, prod等を指定する -t prod
job_key resources/jobs 内で定矩したゞョブの識別名 my_sample_job
--refresh-all パむプラむン(DLT)などの状態をリフレッシュしお実行する --refresh-all

もし、あなたの databricks.yml に prod ずいうタヌゲットず、main_job ずいうゞョブが定矩されおいる堎合、コマンドは以䞋のようになりたす。

# prod環境のゞョブを実行する堎合
databricks bundle run -t prod main_job`

ワヌクフロヌで途䞭のタスクに勝手に必芁パラメヌタ远加されおこけた堎合、どういう再実行スタむルが望たしい

Databricksのゞョブワヌクフロヌが途䞭のタスクで倱敗し、特に「意図しないパラメヌタ远加」などの構成ミスが原因だった堎合、最も効率的で望たしい再実行スタむルは 「倱敗したタスクからの再実行Repair Run」 です。

再実行スタむル 内容 メリット デメリット
Repair Run (倱敗したタスクから再詊行) 倱敗したタスクず、その䞋流のタスクのみを実行する 時間ず蚈算リ゜ヌスの節玄。成功枈みのタスクを繰り返さない 倱敗原因が䞊流のデヌタ䞍備にある堎合は解決しない
Run Now (すべお再実行) ワヌクフロヌを最初からやり盎す デヌタの敎合性が完党に保蚌される 成功しおいた重い凊理Bronze→Silver等を再床回すためコストが高い
手動ノヌトブック実行 個別のノヌトブックを開いお修正・実行 自由なデバッグが可胜 ゞョブの䟝存関係やパラメヌタの匕き継ぎが再珟できない

その堎合の望たしい察応フロヌ

「パラメヌタ蚭定ミス」でコケた堎合、以䞋の手順がベストプラクティスです。

  1. 構成の修正: たず、ゞョブの蚭定あるいはBundlesの゜ヌスコヌドから、問題ずなった䜙蚈なパラメヌタを削陀・修正したす。
  2. Repair Runを遞択:
    • Databricks UIのゞョブ実行詳现画面で 「Repair Run修埩実行」 をクリックしたす。
    • これにより、「修正埌の蚭定」を䜿っお、倱敗したタスクから凊理が再開されたす。
  3. 郚分的なパラメヌタ䞊曞き (必芁に応じお):
    • 再実行時に、その回だけの特定のパラメヌタを枡しお「修埩」するこずも可胜です。

なぜ「Repair Run」が望たしいのか

Databricksのワヌクフロヌは、各タスクが成功したずいう状態を保持しおいたす。

  • 効率: 既に完了しおいる䞊流タスク䟋倧芏暡なデヌタロヌドをスキップできるため、埩旧たでの時間が最短になりたす。
  • べき等性: デヌタ゚ンゞニアリングでは、二重取り蟌みDouble Ingestionを防ぐために、䞀床成功したタスクは繰り返さないのが基本です。

タスクずゞョブはどっちがでかいくくり

「ゞョブJob」の方が倧きなくくりです。

1぀の「ゞョブ」ずいう倧きな箱の䞭に、実行したい個別の凊理である「タスクTask」が耇数入っおいる、ずいう構造になっおいたす。


ゞョブずタスクの階局構造

単䜍 圹割 具䜓䟋
ゞョブ (Job) ワヌクフロヌ党䜓の管理単䜍。スケゞュヌルや通知を管理する。 「日次売䞊集蚈パむプラむン」
タスク (Task) ゞョブ内で行われる個別の実行ステップ。 「デヌタの抜出」「倉換」「モデルの掚論」

ゞョブの実行コマンドはrun ? execute?

Databricks CLIやAPIにおいお、ゞョブを動かす際の正匏な動詞コマンドは run です。

execute ずいうコマンドは存圚したせん。Databricks Asset Bundles (DABs) でも、通垞の CLI でも run を䜿甚したす。


ゞョブ実行に関するコマンド䜓系

コンテキスト 実行コマンド 甹途
DABs (バンドル) databricks bundle run バンドルでデプロむしたゞョブを起動する
Databricks CLI databricks jobs run-now 既存のゞョブJob ID指定を即座に実行する
REST API POST /api/2.1/jobs/run-now 倖郚システムからゞョブ実行をリク゚ストする
SQL (参考) EXECUTE ... SQLのストアドプロシヌゞャ等を呌ぶ際に䜿うが、ゞョブには䜿わない

バンドルを定矩したyamlに関しお、permissionずjobの階局はどんな感じになる

resources:
  jobs:
    my_analysis_job:  # ゞョブの識別子
      name: "Daily Analysis Job"
      tasks:
        - task_key: "run_notebook"
          notebook_task:
            notebook_path: "./notebook.ipynb"

      # ここが permissions の階局
      permissions:
        - group_name: "data-engineers"
          level: "CAN_MANAGE"
        - group_name: "analysts"
          level: "CAN_VIEW"

Saltingはどういう操䜜で、どういう時に有効

DatabricksApache SparkにおけるSalting゜ルティングは、デヌタの偏りデヌタスキュヌ / Data Skewを解消し、䞊列凊理のボトルネックを打砎するための高床なチュヌニング手法です。

Salting の抂芁

項目 内容
䞻な目的 デヌタスキュヌ特定のパヌティションぞのデヌタ集䞭の解消
操䜜内容 結合キヌ等にランダムな数倀1〜Nを付加しおキヌを现分化する
有効な堎面 特定のID䟋Nullやデフォルト倀、巚倧顧客IDによる凊理遅延時
トレヌドオフ 小さい方のテヌブルDimensionテヌブルなどを、付䞎した塩Saltの数だけ耇補Explodeしお拡匵する必芁がある

1. どんな時に有効かスキュヌの怜知

以䞋のような症状が出おいる時に非垞に有効です。

  • 99%のタスクは数秒で終わるのに、最埌の1぀のタスクだけが数分〜数時間終わらない。
  • Spark UI の「Executor」タブで、特定のコアだけがずっず皌働しおおり、他が遊んでいる。
  • 特定のキヌ䟋user_id = 0 や country = 'US'のレコヌド数が数千䞇件ある。

Saltingはどうすれば行える

以䞋のような圢

from pyspark.sql import functions as F

# 塩の範囲パヌティションをいく぀にバラしたいか
salt_bins = 5

# テヌブルA: 結合キヌに 1〜5 のランダムな数倀を付加
df_skewed = df_a.withColumn(
    "salted_key", 
    F.concat(F.col("user_id"), F.lit("_"), F.expr(f"int(rand() * {salt_bins})"))
)

様々なフィルタヌ条件を䜿われる堎合、DeltaよりHiveの方が良いケヌスがある

結論から蚀うず、珟代のDatabricks環境においお「Hiveの方が良いケヌス」はほが存圚したせん。

特に「様々なフィルタヌ条件WHERE句」が䜿われるアドホックなク゚リや耇雑なワヌクロヌドにおいおは、Delta Lakeの方が圧倒的に有利です。

詊隓察策ずしお、なぜHive暙準的なParquetテヌブルではなくDeltaが遞ばれるのか、フィルタヌ性胜の芳点から比范衚にたずめたした。

Delta Lake vs Hive (Parquet) のフィルタヌ性胜比范

機胜 Hive (埓来) Delta Lake (掚奚) フィルタヌぞの圱響
デヌタスキップ パヌティション単䜍のみ ファむル内の統蚈情報min/max単䜍 読み蟌むデヌタ量を劇的に削枛
Z-Order / Liquid サポヌトなし フルサポヌト 耇数カラムでの絞り蟌みを高速化
統蚈情報の管理 手動で ANALYZE が必芁 曞き蟌み時に自動収集 垞に最適なク゚リプランが䜜成される
ファむルサむズ管理 小さなファむルが溜たりやすい OPTIMIZE で自動結合 I/O効率が䞊がり、スキャンが速くなる
むンデックス 基本なしパヌティションのみ ブルヌムフィルタ・むンデックス 特定の倀を探す速床が向䞊

2. 倚次元的な最適化 (Z-Order / Liquid Clustering)

Hiveのパヌティションは通垞1぀か2぀のカラムに限られたす。

  • Deltaの堎合: Liquid Clustering を䜿えば、倚数のカラムに察しお柔軟にクラスタリングをかけられたす。ナヌザヌが「日付で絞る」「顧客IDで絞る」「商品カテゎリで絞る」ずいったバラバラな条件で怜玢しおも、どのパタヌンでも高速に応答できたす。

3. ブルヌムフィルタ・むンデックス (Bloom Filter Index)

特定の高カヌディナリティなカラム䟋IDやシリアル番号で完党䞀臎怜玢=を倚甚する堎合、Deltaではブルヌムフィルタを䜜成できたす。これにより「そのファむルに探しおいる倀が含たれおいないこず」を瞬時に刀断できたす。

Liquid Clusteringっお蚘憶ではパヌテヌションキヌず゜ヌトキヌは固定しないず䜿えないはずだが、それでどうやっお柔軟な怜玢が可胜になる

実は、Liquid Clusteringの最倧の歊噚は「パヌティションキヌずいう抂念を捚お、カラムキヌを埌から自由に入れ替えられる柔軟性」にありたす。


なぜ「固定」ではなく「柔軟」なのか

埓来の方匏HiveパヌティショニングやZ-Orderず比范しお、Liquid Clusteringがなぜ柔軟ず蚀われるのか、その理由を敎理したす。

特城 埓来のパヌティショニング / Z-Order Liquid Clustering (最新)
キヌの固定 テヌブル䜜成時に固定。倉曎にはテヌブル再䜜成が必芁 い぀でも倉曎可胜。 既存デヌタを曞き盎さずにキヌを入れ替えられる
䞊べ替えの軞 Z-Orderは䞀床に指定したカラムで物理固定される デヌタの分垃に合わせお「増分的」にクラスタリングを最適化する
怜玢の柔軟性 指定したキヌ以倖での怜玢はフルスキャンになりがち クラスタリングキヌに含たれるどのカラムの組み合わせでも高速
カヌディナリティ 高すぎるず「スモヌルファむル問題」が発生する 高カヌディナリティID等でも自動でサむズ調敎される

Liquid Clusteringでは、以䞋のコマンドで運甚䞭にキヌを倉曎できたす。

- テヌブル䜜成埌、やっぱり別のカラムで最適化したくなった堎合
ALTER TABLE table_name CLUSTER BY (new_column_1, new_column_2);

このコマンドを打った埌の OPTIMIZE 実行時から、新しいキヌに基づいおデヌタが再配眮され始めたす。「テヌブル定矩を壊さずに、ビゞネス芁件に合わせお最適化の軞を倉えられる」のが最倧の柔軟性です。

2. 「倚次元」での高速怜玢

Z-Orderも倚次元怜玢に匷いですが、デヌタが増えるほど「䞊べ替えコスト」が指数関数的に増倧したす。

Liquid Clusteringは、「ヒルベルト曲線」などの高床なアルゎリズムを内郚で利甚し、耇数のカラム最倧4぀掚奚の盞関関係を保ったたたデヌタを物理的にたずめたす。これにより、「日付のみ」「IDのみ」「日付ID」ずいった、どのパタヌンのフィルタヌ条件が来おも、関連するデヌタブロックだけを的確に狙い撃ちData Skippingできたす。

3. パヌティションの「现分化」からの解攟

「日付」でパヌティションを切るず、1日のデヌタが少ない堎合にファむルが现かくなりすぎお性胜が萜ちたした。
Liquid Clusteringは「物理的なディレクトリ構造」に䟝存しないため、デヌタ量に応じお自動でクラスタヌのサむズを調敎したす。これにより、蚭蚈者が「どの粒床でパヌティションを切るべきか」ず悩む必芁がなくなりたした。


詊隓察策のポむント

詊隓で「Liquid Clusteringの利点」を問われたら、以䞋のキヌワヌドが正解に盎結したす。

  • No Partition Evolution required: パヌティションの再定矩マむグレヌションが䞍芁。
  • Support high-cardinality columns: 顧客IDのようなナニヌクな倀が倚いカラムでも性胜劣化しない。
  • Incremental Clustering: OPTIMIZE を実行するたびに、新しく入っおきたデヌタだけを賢くクラスタリングしおくれる。

Delta Live TablesずStructured Streamingのメリデメ衚

䟋えるならば、Structured Streamingは「゚ンゞン郚品」であり、DLTはその゚ンゞンを積んだ「自動運転車フレヌムワヌク」です。


DLT vs Structured Streaming 比范衚

比范項目 Structured Streaming (䜎レベルAPI) Delta Live Tables (高レベル構成)
䞻な圹割 リアルタむム・増分デヌタ凊理の蚘述 パむプラむン党䜓の管理・運甚・品質保蚌
開発蚀語 Python, Scala, Java, SQL Python, SQL
むンフラ管理 クラスタヌの構成・起動を手動で管理 むンフラを自動スケヌル・自動管理 (Serverless)
䟝存関係 各ゞョブの実行順序を倖郚(Workflow等)で制埡 テヌブル間の䟝存関係をグラフ(DAG)で自動解決
デヌタ品質 ナヌザヌがチェックロゞックを実装 Expectations機胜で宣蚀的に品質を定矩
モニタリング Spark UI やログで個別に確認 統合UIでリネヌゞやデヌタ品質を可芖化
コスト 䜎い自由床が高い分、管理コスト増 高めDBUに加え管理機胜のコスト

1. Structured Streaming のメリット・デメリット

  • メリット:
    • 究極の柔軟性: トリガヌの間隔、チェックポむントの堎所、状態管理Stateful processingを现かく制埡できる。
    • 既存コヌドの流甚: 玔粋なSparkコヌドなので、既存のラむブラリや凊理ロゞックをそのたた組み蟌みやすい。
  • デメリット:
    • 運甚負荷: チェックポむントの管理や、クラスタヌが萜ちた際の再起動、スキヌマ進化の察応などを自分で行う必芁がある。
    • 可芖性の欠劂: デヌタの流れリネヌゞを远うのが難しく、パむプラむン党䜓の状態把握に手間がかかる。

2. Delta Live Tables (DLT) のメリット・デメリット

  • メリット:
    • 宣蚀的開発: 「どう動かすか」ではなく「どんなテヌブルを䜜りたいか」を曞くだけで、システムが最適化しおくれる。
    • デヌタ品質 (Expectations): expect_or_drop などを䜿っお、䞍正デヌタを自動で陀倖・監芖できる。
    • 自動化: クラスタヌのサむズ調敎Enhanced Autoscalingや、゚ラヌ時の自動リトラむが暙準搭茉。
  • デメリット:
    • 制玄: DLTのフレヌムワヌク内で蚱可されおいない操䜜特定のラむブラリの䜿甚や耇雑なI/Oが制限されるこずがある。
    • 孊習コスト: DLT独自の構文live. プレフィックスなどやラむフサむクルを理解する必芁がある。

Delta Live TablesずStructured Streamingを䜵甚したコヌド䟋

import dlt
from pyspark.sql import functions as F

# --- 1. Bronzeå±€: 倖郚゜ヌスから Structured Streaming で取り蟌み ---
# spark.readStream + format("cloudFiles") を䜿甚
@dlt.table(
    name="raw_game_logs",
    comment="Auto Loaderを䜿甚しおクラりドストレヌゞから生ログを取り蟌む"
)
def raw_game_logs():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/mnt/data/events/")
    )

# --- 2. Silverå±€: DLT間のストリヌミング接続ずデヌタ品質管理 ---
# dlt.read_stream を䜿甚しお、䞊流のテヌブルから増分を読み出す
@dlt.table(
    name="cleaned_game_logs",
    comment="䞍正なスコアを陀去し、タむムスタンプを正芏化する"
)
@dlt.expect_or_drop("valid_score", "score >= 0") # デヌタ品質の制玄
def cleaned_game_logs():
    return (
        dlt.read_stream("raw_game_logs") # 前のテヌブルをストリヌムずしお参照
        .withColumn("event_timestamp", F.to_timestamp("event_time"))
        .select("user_id", "event_type", "score", "event_timestamp")
    )

# --- 3. Goldå±€: マテリアラむズドビュヌによる集蚈 ---
# 集蚈Window関数などを行うため、ここではストリヌムではなくテヌブルずしお定矩
@dlt.table(
    name="daily_score_summary",
    comment="ナヌザヌごずの日次合蚈スコア"
)
def daily_score_summary():
    return (
        dlt.read("cleaned_game_logs") # 集蚈の堎合は read_stream ではなく read を䜿うのが䞀般的
        .groupBy("user_id", F.window("event_timestamp", "1 day"))
        .agg(F.sum("score").alias("total_daily_score"))
    )
特城 ストリヌミングテヌブル マテリアラむズドビュヌ
定矩方法 (Python) dlt.read_stream(...) dlt.read(...)
SQL構文 CREATE OR REFRESH STREAMING TABLE CREATE OR REFRESH LIVE TABLE
デヌタの凊理 「増分」のみ。新しく届いたデヌタだけを既存テヌブルに远加Appendする。 「最新状態」を反映。基になるテヌブル党䜓をスキャンしお結果を曎新する。
䞻な甚途 Bronze, Silver局倧量デヌタの高速な取り蟌み、単玔な倉換 Gold局集蚈、ランキング、耇雑なJoin、最新マスタの反映
過去デヌタの修正 過去分を自動で修正するのは苊手再蚈算が必芁。 ゜ヌスが倉われば、次回の曎新時に蚈算結果が自動的に修正される。

Bronze局で expect譊告のみを蚭定し぀぀、その違反レコヌドをSilver局で「埌远い」で陀倖したいずいうシナリオ

最もスマヌトな方法は

「Bronzeは党件入れおおき、Silverの定矩で expect_or_drop を䜿う」

ずいう構成です。これにより、Bronzeには調査甚の「ダメなデヌタ」を残し぀぀、分析甚のSilverはクリヌンに保おたす。

レむダヌ 蚭定内容 (Expectation) デヌタの状態 目的
Bronze expect 党レコヌド違反含むを保持 監査・デバッグ・元の姿の保存
Silver expect_or_drop 違反レコヌドを自動陀倖 信頌できる分析甚デヌタの䜜成

spark.readStream ず dlt.read_stream の違い

特城 spark.readStream dlt.read_stream
参照先 倖郚゜ヌス (S3, ADLS, Kafka等) DLT内の別のテヌブル (ラむブテヌブル)
圹割 パむプラむンの「入り口」を定矩する パむプラむン内の「䟝存関係」を構築する
䟝存関係の解決 明瀺的なパス指定が必芁 DLTが自動でテヌブル間の順序を蚈算する
䞻芁な甚途 Auto Loader (cloudFiles) での初回読蟌 Bronze → Silver → Gold のデヌタ䌝搬

1. spark.readStream倖郚からの「取り蟌み」

spark.readStream は、暙準的な Apache Spark の呜什です。DLT の倖にある「ファむル」や「メッセヌゞキュヌ」からデヌタを吞い䞊げる時に䜿いたす。

  • い぀䜿う: パむプラむンの䞀番最初通垞は Bronze 局で、クラりドストレヌゞ䞊の RAW デヌタを読み蟌むずき。
  • 特城: cloudFiles (Auto Loader) などのフォヌマットを指定しお、倖郚ずの接続を確立したす。

2. dlt.read_stream内郚での「参照」

dlt.read_stream は、DLT フレヌムワヌク専甚の特別な関数です。

  • い぀䜿う: すでに DLT 内で定矩した別のテヌブル䟋bronze_tableから、次のテヌブル䟋silver_tableぞデヌタを流したいずき。
  • 最倧のメリット自動DAG構築:
    この関数を䜿うこずで、Databricks は「テヌブル A はテヌブル B の前に実行しなければならない」ずいう䟝存関係DAG: Directed Acyclic Graphを自動で理解したす。

cloudFiles (Auto Loader) っおなに

特城 内容
䞻な圹割 数癟䞇件のファむルを効率的にスキャンし、増分デヌタのみを凊理する
コヌド蚘述 spark.readStream.format("cloudFiles").load(path)
怜知モヌド 「ディレクトリ䞀芧Directory Listing」ず「ファむル通知File Notification」
最倧の匷み スキヌマ掚論Inferenceずスキヌマ進化Evolutionの自動化
チェックポむント どこたで読み蟌んだかを蚘録し、停止しおも再開可胜Exactly-once

cloudFiles=Auto Loader?

結論から蚀うず、「抂念機胜名ずしおは Auto Loader」であり、「コヌド䞊の指定識別子ずしおは cloudFiles」です。

TRANSFORMずexpect actualでテストする堎合の具䜓䟋

メリット 内容
可読性 df = func3(func2(func1(df))) ずいう「入れ子」を避け、盎列に曞ける
テストのしやすさ 関数が独立しおいるため、小さな「Actualデヌタ」を入れお期埅通りか怜蚌できる
DLTずの盞性 DLTの定矩内でこれらの関数を呌び出すだけで、ロゞックがスッキリする
カプセル化 耇雑なビゞネスロゞックを関数の䞭に隠し、メむン凊理の芖認性を保おる

具䜓的なコヌド䟋

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

# 1. 倉換ロゞックを独立した関数ずしお定矩再利甚・テスト可胜
def add_tax_transform(df: DataFrame) -> DataFrame:
    return df.withColumn("total_price", F.col("price") * 1.1)

def filter_invalid_orders(df: DataFrame) -> DataFrame:
    return df.filter(F.col("price") > 0)

# 2. 実際のゞョブやDLTでの䜿甚䟋メ゜ッドチェヌン
# transform() を䜿うこずで、䞊から䞋ぞ流れるように曞ける
df_result = (
    spark.read.table("bronze_orders")
    .transform(filter_invalid_orders)
    .transform(add_tax_transform)
)

# --- テストコヌドのむメヌゞ ---

# 1. テスト甚デヌタの䜜成 (Actual)
input_df = spark.createDataFrame([(100,), (200,)], ["price"])

# 2. ロゞックの適甚
actual_df = input_df.transform(add_tax_transform)

# 3. 期埅するデヌタの䜜成 (Expected)
expected_df = spark.createDataFrame([(100, 110.0), (200, 220.0)], ["price", "total_price"])

# 4. 比范䞀臎すればテスト合栌
assert actual_df.collect() == expected_df.collect()

倚様な時間でフィルタヌする堎合は、時間はISO 8601圢匏の文字列で保存で充分それずも倀

「文字列ISO 8601」ではなく、必ず「Timestamp型内郚的にはLong倀」で保存すべきです。

比范項目 文字列 (ISO圢匏) Timestamp型 / Date型
デヌタサむズ 倧きい (䟋: 20文字以䞊) 小さい (内郚的に 8バむト等の数倀)
フィルタヌ性胜 遅い文字列比范になる 非垞に速い数倀比范で最適化される
デヌタスキップ 効きにくい蟞曞順になる 匷力に効くMin/Max統蚈が正確
関数の利甚 to_timestamp() 等の倉換が必芁 盎接 year(), window() 等が䜿える
暙準化 タむムゟヌンの扱いにミスが出やすい UTC等の暙準圢匏で厳密に管理可胜

なぜ「数倀Timestamp型」の方がフィルタヌに匷いのか

1. Delta Lake の「Data Skipping」

Delta Lake は、ファむルの各カラムの Min最小倀 ず Max最倧倀 を統蚈情報ずしお保持しおいたす。

  • Timestamp型の堎合: 数倀ずしお管理されおいるため、WHERE time > '2026-03-01' ずいうク゚リに察し、゚ンゞンが「このファむルの最倧倀は 2026-02-28 だから読む必芁なし」ず瞬時に刀断スキップできたす。
  • 文字列の堎合: 文字列ずしおの Min/Max 比范になるため、型に比べるずオヌバヌヘッドが倧きく、耇雑な時間蚈算「1時間前」などを含むフィルタヌの最適化が困難になりたす。

2. パヌティショニングず Z-Order

時間をフィルタヌの䞻軞にする堎合、Date 型に倉換しおパヌティショニングしたり、Timestamp 型で Z-Order や Liquid Clustering を適甚したりしたす。
これらは数倀的な順序に基づいおデヌタを物理的に䞊べ替えるため、型が䞀臎しおいないず十分な性胜を発揮できたせん。

Timestamp型を効率的に、か぀倚様な条件でフィルタヌする際のSQL䟋

-- 特定の期間を指定
SELECT * FROM orders 
WHERE order_timestamp BETWEEN '2026-01-01 00:00:00' AND '2026-01-31 23:59:59';

-- 単玔な比范挔算子
SELECT * FROM orders 
WHERE order_timestamp >= '2026-03-01';

Shuffle Partitionずは䜕

項目 内容
発生タむミング Wide TransformationJoin, GroupBy, Distinctなどの実行時
デフォルト倀 200 (オヌプン゜ヌスSparkの堎合)
蚭定パラメヌタ spark.sql.shuffle.partitions
圹割 シャッフル埌のタスクの䞊列床を決定する
圱響 少なすぎるずメモリ䞍足(OOM)、倚すぎるずオヌバヌヘッド増

なぜ「シャッフル」が必芁なのか

䟋えば、各ノヌドにバラバラに散らばっおいる「売䞊デヌタ」を「地域ごず」に集蚈したい堎合、同じ地域のデヌタを䞀぀の堎所に集める必芁がありたす。この 「デヌタをネットワヌク越しに移動させお再配眮するプロセス」がシャッフルです。

このずき、移動したデヌタをいく぀の塊パヌティションに分けるかを決めるのが Shuffle Partition です。

Liquid Clusteringっおパヌテヌションキヌず゜ヌトキヌは途䞭でいくらでも倉えれる

はい、い぀でも、䜕床でも倉曎可胜です。

Delta Sharingに関するopen share機胜っお静的Deltaテヌブルのみ共有可胜ノヌトブックなどは

Open Sharingはdatabricks倖ぞの共有を行うもの

アセットの皮類 Open Sharing (倖郚) Databricks-to-Databricks
Delta テヌブル ○ 共有可胜 ○ 共有可胜
ビュヌ (View) ○ 共有可胜 (Unity Catalog経由) ○ 共有可胜
ボリュヌム (Volumes) ○ 共有可胜 (非定圢ファむル) ○ 共有可胜
ノヌトブック × 共有䞍可 ○ 共有可胜
モデル (MLflow) × 共有䞍可 ○ 共有可胜

Open Sharingの手順

  1. アクティベヌションリンクの送付:
    デヌタ提䟛者が共有蚭定をするず、䞀回限りの「アクティベヌションリンクURL」が発行されたす。これを盞手にメヌル等で送りたす。
  2. 資栌情報ファむルJSONのダりンロヌド:
    盞手がそのリンクをクリックするず、資栌情報ファむルconfig.shareずいうJSONファむルがダりンロヌドできたす。
    • 泚意: このリンクには有効期限があり、䞀床しかダりンロヌドできたせん。
  3. オヌプン゜ヌスクラむアントでの接続:
    盞手は、Python、Pandas、Power BI、Apache Spark、TableauなどのDelta Sharing察応クラむアントにそのJSONファむルを読み蟌たせるこずで、あたかも自分の手元にデヌタがあるかのようにク゚リを実行できたす。

デヌタを保存する堎合、UCのマスク関数では、保存されるデヌタはマスクされない問題がある

はい、その通りです。

Unity CatalogUCの「カラムマスクMasking Functions」は、「読み取り時ク゚リ実行時」に動的にデヌタを隠す機胜であり、ストレヌゞS3やADLS䞊のDeltaファむルに保存されおいる物理デヌタそのものを曞き換えるわけではありたせん。

マテリアラむズドビュヌずはどんなもの

Materialized Viewは、簡単に蚀うず「ク゚リの結果をあらかじめ蚈算しお、物理的に保存しおおくテヌブル」のこずです。

特城 通垞のビュヌ (View) マテリアラむズドビュヌ (MV) 通垞のテヌブル (Table)
実䜓デヌタ なし定矩のみ あり物理保存 あり物理保存
読み取り速床 遅い毎回蚈算 速い蚈算枈み 速い
最新性 垞に最新 リフレッシュが必芁 手動曎新
蚈算コスト 読み取りのたびに発生 リフレッシュ時に発生 曞き蟌み時に発生

コヌド比范䟋

通垞のビュヌ

-- 定矩するだけデヌタは保存されない
CREATE VIEW main.default.high_value_orders_view
AS
SELECT 
  order_id, 
  customer_id, 
  amount 
FROM main.default.orders
WHERE amount > 1000;

マテリアラむズドビュヌ

-- デヌタを蚈算しお物理保存する
CREATE MATERIALIZED VIEW main.default.high_value_orders_mv
AS
SELECT 
  order_id, 
  customer_id, 
  amount 
FROM main.default.orders
WHERE amount > 1000;

-- 最新状態にするには「リフレッシュ」が必芁
REFRESH MATERIALIZED VIEW main.default.high_value_orders_mv;

DLT版 通垞のビュヌ

import dlt
@dlt.view
def temporary_filter_view():
    # パむプラむン内でのみ有効な䞀時的なビュヌ
    # カタログには保存されない
    return dlt.read("raw_data").filter("id IS NOT NULL")

DLT版 マテリアラむズドビュヌ

import dlt

@dlt.table(
  name="sales_summary_mv",
  comment="集蚈結果を保持するマテリアラむズドビュヌ"
)
def sales_summary():
    # dlt.read() を䜿うこずで、䞊流の党デヌタを読み蟌んで集蚈し、
    # その結果を物理テヌブルずしお保存する
    return (
        dlt.read("cleaned_sales") 
        .groupBy("region")
        .sum("amount")
    )

具䜓的な挙動のむメヌゞ

䟋えば、数億件の「泚文ログ」から「日別の売䞊集蚈」を出す堎合

  1. 䜜成: CREATE MATERIALIZED VIEW daily_sales AS SELECT date, sum(amount) FROM orders GROUP BY date;
  2. 物理保存: Databricks はこの集蚈結果を Delta テヌブルずしお保存したす。
  3. 参照: ナヌザヌが SELECT * FROM daily_sales ず打぀ず、数億件の蚈算を飛ばしお、集蚈埌の数千行を読み蟌むだけになりたす。
  4. リフレッシュ: 新しい泚文デヌタが入っおきたら、REFRESH MATERIALIZED VIEW daily_sales を実行たたは自動スケゞュヌルしお䞭身を最新にしたす。

詊隓・実務でのポむント

  • 「い぀ MV を䜿うべきか」:
    • ベヌステヌブルが巚倧。
    • 倉換凊理Transformが重い。
    • ク゚リの頻床が高いBIダッシュボヌドなど。
  • 「最新性」の劥圓性:
    • MV はリフレッシュされるたでデヌタが叀くなる可胜性がありたす。秒単䜍のリアルタむム性が必芁な堎合は、通垞のビュヌやストリヌミングテヌブルを怜蚎したす。
  • Unity Catalog での管理:
    • 最近の Databricks では、Unity Catalog 䞊で䜜成する MV が掚奚されおいたす。これにより、他の BI ツヌルからも高速な MV にアクセスできるようになりたす。

マテリアラむズドビュヌは耇雑な財務凊理関数などず盞性いい

「非垞に盞性が良い」です。

耇雑な財務凊理倖貚換算、耇雑な償华蚈算、皎額算出などは蚈算コストが高く、か぀コヌドの保守性が求められるため、マテリアラむズドビュヌMVを掻甚するメリットが最倧化されたす。

蚈算コストの「事前払い」ができるため

列のマスクを、ナヌザヌグルヌプごずによっおUDFを䜿っお実珟する堎合、どんな実装になる

  1. マスク甚関数SQL UDFの䜜成
CREATE OR REPLACE FUNCTION email_mask(email STRING)
RETURN CASE
  -- HR人事グルヌプならそのたた芋せる
  WHEN is_account_group_member('hr_users') THEN email
  -- 䞀般ナヌザヌならドメむン以倖を隠す䟋: a***@example.com
  WHEN is_account_group_member('standard_users') THEN regexp_replace(email, '^.*(?=@)', '***')
  -- それ以倖倖郚の人などは完党に隠す
  ELSE 'REDACTED'
END;
  1. テヌブルのカラムにマスクを適甚する
-- テヌブル䜜成時に指定する堎合
CREATE TABLE customers (
  user_id INT,
  email STRING MASK email_mask,  -- ここで関数をバむンド
  full_name STRING
);

-- 既存のテヌブルに埌から適甚する堎合
ALTER TABLE customers ALTER COLUMN email SET MASK email_mask;

CatalogレベルでUCで暩限蚭定しおる堎合、珟状あるテヌブルや、以降䜜る配䞋のスキヌマなどの暩限はどうなるカスケヌド継承される

はい、Unity CatalogUCの暩限は「カスケヌド継承」されたす。

1. メタストア (Metastore)
   └── 2. カタログ (Catalog)  ← ここで暩限を振るず...
       └── 3. スキヌマ (Schema)  ← 自動で継承
           └── 4. テヌブル / ビュヌ / ボリュヌム  ← 自動で継承

3. 泚意点継承の「䞊曞き」や「拒吊」はできる

実務や詊隓で混乱しやすいポむントがいく぀かありたす。

  • DENY は継承よりも優先されたす。䟋えば、カタログレベルで SELECT 暩限があっおも、特定のスキヌマに察しお DENY SELECT を蚭定すれば、そのナヌザヌはそのスキヌマを芋るこずができなくなりたす。
  • 「䟋倖的に䞀郚のスキヌマだけ隠す」は難しい:
    カタログレベルで SELECT を䞎えおしたうず、そのカタログ内の特定のスキヌマだけを芋せないようにするこずはできたせん。
    • 察策: 暩限を絞りたい堎合は、カタログレベルではなく、個別のスキヌマレベルで暩限を付䞎するのがベストプラクティスです。
  • Ownership所有暩の継承:
    所有暩OWNERは継承されたせん。カタログの所有者であっおも、他人が䜜ったスキヌマの所有者にはなりたせんが、カタログレベルの匷力な暩限CREATE などによっお管理は可胜です。

Auto loaderで、imageや、binary、textっお括りはある

Auto LoadercloudFilesにおいお、「image」や「binary」ずいった専甚のフォヌマット指定括りはありたせん。

Auto Loaderが盎接サポヌトしおいる「ファむル圢匏」は、あくたで構造化・半構造化デヌタJSON, CSV, Parquet等が䞭心です。しかし、画像やバむナリファむルを扱いたい堎合には、binaryFile ずいう圢匏を組み合わせお䜿うのが䞀般的です。

SCD Type 1 vs. Type 2っおなに

デヌタ゚ンゞニアリングの䞖界で避けおは通れないのが、この SCDSlowly Changing Dimensionゆっくり倉化する次元 です。

䞀蚀で蚀うず、「マスタデヌタの倀が倉わったずき、過去の履歎をどう扱うか」 ずいう戊略の違いです。


SCD Type 1䞊曞き履歎を残さない

「過去なんお振り返らない」スタむルです。既存のレコヌドを新しい倀でそのたた䞊曞きしたす。

  • 挙動: UPDATE 凊理のみ。
  • 甹途: 名前のタむポ修正や、過去の倀を保持する必芁がたったくない項目䟋Emailアドレスの倉曎など。
  • メリット: デヌタ量が最小限で枈み、ク゚リが非垞にシンプル。
  • デメリット: 「1ヶ月前はどうだったか」ずいう分析が䞍可胜になる。

SCD Type 2履歎保持新しい行を远加

「過去も倧切にする」スタむルです。倀が倉わるたびに新しい行を远加し、どのデヌタが「い぀からい぀たで有効だったか」を管理したす。

  • 挙動: 既存の行を「叀い」ずし、新しい行を INSERT する。
  • 管理カラム: 通垞、start_date有効開始日、end_date有効終了日、is_current最新フラグを远加したす。
  • 甹途: 䜏所地域別の売䞊分析に必芁、圹職、補品䟡栌など、時間の経過に䌎う倉化を远いたい項目。
  • メリット: 過去の任意の時点の状態を正確に再珟できる。
  • デメリット: デヌタ量が肥倧化し、結合Joinやク゚リが耇雑になる。
比范項目 SCD Type 1 SCD Type 2
デヌタ操䜜 Overwrite䞊曞き Add a new row行远加
履歎の保持 なし あり完党な履歎
テヌブルサむズ 倉化なし 増倧し続ける
耇雑さ 䜎い単玔なUpdate 高い日付管理が必芁
䞻な甚途 ミスの修正、重芁床の䜎い曎新 監査芁件、時系列分析、トレンド調査

CDC デヌタっおなに

DC (Change Data Capture) ずは、日本語で「倉曎デヌタキャプチャ」ず呌ばれ、デヌタベヌスに加えられた「远加Insert」「曎新Update」「削陀Delete」の倉曎履歎だけを抜出しお、別のシステムに同期させる手法のこずです。

これたでに話した「Auto Loader」や「SCD Type 2」ず非垞に関連が深い技術です。


なぜ CDC が必芁なのか

䟋えば、基幹システムの MySQL にある「顧客テヌブル」を Databricks に同期したいずしたす。

  • 埓来の方法フルロヌド: 毎日、数千䞇件の党デヌタを匕っこ抜く。
    • → デヌタベヌスに負荷がかかりすぎるし、時間もかかる。
  • CDC の方法: 「今日の10時に Aさんの䜏所が倉わった」ずいうログだけを远いかける。
    • → 負荷が極めお䜎く、ほがリアルタむムで同期できる。

CDC デヌタの構造むメヌゞ

CDC ツヌルDebezium などを通すず、デヌタは通垞以䞋のような「メタデヌタ付きの圢匏」で届きたす。

倉曎内容 (Op) 顧客ID 名前 䜏所 曎新日時
I (Insert) 101 田侭 東京 09:00
U (Update) 101 田侭 倧阪 10:30
D (Delete) 105 䜐藀 NULL 11:00

Databricks における CDC の匷力な味方

CDC デヌタは「単なる倉曎の矅列」なので、そのたたではテヌブルずしお䜿えたせん田䞭さんの最新䜏所はずいう問いに答えにくい。そこで、以䞋の機胜を䜿いたす。

1. APPLY CHANGES INTO (Delta Live Tables)

これが最匷のツヌルです。CDC で届いた「I, U, D」のフラグを読み取り、タヌゲットの Delta テヌブルに察しお SCD Type 1 や Type 2 を自動適甚しおくれたす。

2. MERGE 文

SQL や Python で手動で CDC デヌタを反映させる堎合は、MERGE 文を䜿いたす。「マッチしたら Update、なければ Insert」ずいう凊理を 1 ぀のトランザクションで行いたす。

DatabricksDelta LakeでSCDやCDCを扱う際の、「珟堎でそのたた䜿えるSQL」を敎理したす。

䞻に、手動で実行する MERGE INTO ず、Delta Live Tables (DLT) で自動化する APPLY CHANGES INTO の2぀のパタヌンがありたす。


1. 手動で行うMERGE INTO (SCD Type 1)

「既存のデヌタがあれば曎新、なければ远加」ずいう最も䞀般的な操䜜です。

MERGE INTO customers AS target
USING customer_updates AS source
ON target.user_id = source.user_id
-- 1. マッチした堎合新しい倀で䞊曞きSCD Type 1
WHEN MATCHED THEN
  UPDATE SET 
    target.email = source.email,
    target.address = source.address,
    target.updated_at = source.updated_at
-- 2. マッチしなかった堎合新芏䜜成
WHEN NOT MATCHED THEN
  INSERT (user_id, name, email, address, updated_at)
  VALUES (source.user_id, source.name, source.email, source.address, source.updated_at);

2. DLTで自動化するAPPLY CHANGES INTO

CDCデヌタを凊理する堎合、この構文を䜿うのがDatabricksのベストプラクティスです。

SCD Type 1 の堎合䞊曞き

CREATE OR REFRESH STREAMING TABLE target_table;
APPLY CHANGES INTO LIVE.target_table
FROM STREAM(LIVE.source_cdc_stream)
KEYS (user_id) - 䞻キヌ
SEQUENCE BY updated_at - 順序保蚌甚のカラム
COLUMNS  EXCEPT (event_type) - 陀倖するカラムがあれば指定
STORED AS SCD TYPE 1; - Type 1履歎なしを指定

SCD Type 2 の堎合履歎保持

これだけで、__start_at や __end_at ずいった履歎管理甚のカラムが自動生成されたす。

APPLY CHANGES INTO LIVE.target_table
FROM STREAM(LIVE.source_cdc_stream)
KEYS (user_id)
SEQUENCE BY updated_at
STORED AS SCD TYPE 2;                  -- Type 2履歎保持を指定

Change Data Feedを実際に䜿う䟋

Change Data Feed (CDF) は、Deltaテヌブルに加えられた「倉曎そのものむンサヌト、曎新前の倀、曎新埌の倀、削陀」を、ログずしお抜出できる機胜です。

単に「最新の状態」を芋るのではなく、「䜕がどう倉わったか」ずいう履歎を、䞋流の凊理に効率的に䌝えるために䜿われたす。


CDF を䜿うための 3 ステップ

1. テヌブルで CDF を有効にする

- テヌブル䜜成時に有効化
CREATE TABLE silver_users ( id INT, name STRING, city STRING
)
TBLPROPERTIES (delta.enableChangeDataFeed = true);

- 既存のテヌブルで有効化
ALTER TABLE silver_users SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

2. デヌタの倉曎操䜜を行う

通垞通り、INSERT や UPDATE を行いたす。

SQL

UPDATE silver_users SET city = 'Tokyo' WHERE id = 101;

3. 倉曎点だけを読み出す (SQL / Python)

ここが CDF の真骚頂です。特定のバヌゞョン間や時間垯の「差分」を指定しお取埗できたす。

SQL の堎合:

- バヌゞョン1から最新たでの「倉曎履歎」を取埗

SELECT  FROM table_changes('silver_users', 1);
- 特定の時間範囲で取埗

SELECT  FROM table_changes('silver_users', '2026-03-04 10:00:00', '2026-03-04 12:00:00');

りォヌタヌマヌクずは、䜿甚䟋

りォヌタヌマヌクWatermarkずは、ストリヌミング凊理においお「遅れお届いたデヌタをどこたで埅぀か」を定矩する境界線のこずです。

from pyspark.sql import functions as F

df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/datapath"))
りォヌタヌマヌクの蚭定event_timeを基準に、5分間の遅延を蚱容

windowed_counts = (df
.withWatermark("event_time", "5 minutes")
.groupBy(
F.window("event_time", "10 minutes")
)
.count())

この蚭定での挙動

  1. 20:00 〜 20:10 のりィンドりが集蚈䞭。
  2. 20:05 に発生したデヌタが、20:09 に届いた → 集蚈に含たれる。
  3. 20:02 に発生したデヌタが、20:12 に届いた → 5分以䞊の遅れだが、りォヌタヌマヌクがただ曎新されおいなければ入る可胜性がある※詳现埌述。
  4. システムが 20:15 のデヌタを凊理し、りォヌタヌマヌクが 20:10 たで進んだ埌、20:04 のデヌタが届いた → 完党に無芖される。

りォヌタヌマヌク、with句、windowなど、必芁なものを甚いお、以䞋のアラヌトを生むSQL

条件

・1日䞀回アラヌト

・以降はアラヌト条件の時は毎日アラヌト

・毎日䜕回もアラヌトは出ない

・条件は3日連続平均気枩が5床以䞊の時

-- 1. 日次の平均気枩を算出するストリヌミングテヌブルBronze -> Silver
-- ここで1日1行のデヌタに集玄し、りォヌタヌマヌクで叀いデヌタを管理したす
WITH daily_summary AS (
  SELECT
    window.start AS alert_date,
    AVG(temperature) AS avg_daily_temp
  FROM STREAM(sensor_data)
  -- デヌタの遅延を最倧1日蚱容
  WATERMARK event_time DELAY '1 day'
  -- 1日単䜍でりィンドり集蚈これで1日1件のレコヌドになる
  GROUP BY window(event_time, '1 day')
),

-- 2. 3日間の移動平均を蚈算するSilver -> Gold盞圓
moving_average_stats AS (
  SELECT
    alert_date,
    avg_daily_temp,
    -- 過去2行珟圚行蚈3日の平均を算出
    AVG(avg_daily_temp) OVER (
      ORDER BY alert_date 
      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) AS three_day_avg,
    -- 3日分のデヌタが揃っおいるかチェックするためのカりント
    COUNT(avg_daily_temp) OVER (
      ORDER BY alert_date 
      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) AS day_count
  FROM daily_summary
)

-- 3. 最終的なアラヌト刀定
SELECT
  alert_date,
  three_day_avg,
  'ALERT: 3-day consecutive average temperature >= 5.0' AS alert_message
FROM moving_average_stats
WHERE 
  -- 3日間の平均が5床以䞊
  three_day_avg >= 5 
  -- 運甚開始盎埌1日目や2日目の誀怜知を防ぐため、3日分揃っおいるこずを確認
  AND day_count = 3

Coalesce vs Repartition:の違い。たずCoalesce(1)ずは䜕

Coalesce(1) は、䞀蚀で蚀えば「バラバラに散らばったデヌタを、無理やり1぀の箱パヌティションに凝瞮する」操䜜です。

デヌタ゚ンゞニアリングの実務においお、Coalesce(1) が登堎するシヌンはほが䞀぀「最終結果を1぀のファむルずしお出力したい時」です。


1. Coalesce(1) ずは䜕をしおいるのか

Apache Sparkにおいお、デヌタは通垞耇数のワヌカヌノヌドに分散しお保持されおいたす。Coalesce(1) を実行するず、Sparkはネットワヌク経由で党デヌタを1぀のノヌドに集め、1぀のパヌティションにたずめたす。

最倧の特城シャッフルを「最小限」に抑える

Repartition ずの決定的な違いは、フルシャッフル党デヌタの無差別な再配眮を避ける点です。Coalesce は既存のパヌティションを「結合」するだけなので、デヌタの移動を最小限に抑えようずしたす。

特城 Coalesce Repartition
パヌティション数 枛らすこずしかできない 増やすこずも枛らすこずも可胜
シャッフル 基本発生しない最小限 必ず発生するフルシャッフル
デヌタの均䞀性 偏りスキュヌが出やすい 均等に再配眮される
䞻な甚途 凊理の最埌にファむル数を枛らす 䞊列床を䞊げお凊理を高速化する
コスト 䜎い効率的 高いネットワヌク負荷倧

3. なぜ Coalesce(1) を䜿うのか実務䟋

䞀番倚いのは、倖郚システムExcelで読み蟌むCSVや、特定のツヌルが読み蟌む蚭定ファむルなどが「1぀のファむルしか受け付けない」堎合です。

# 䜕癟個もファむルができるのを防ぎ、result.csv 1぀だけを䜜る
df.coalesce(1).write.format("csv").save("/path/output")

Wheel Fileを、むンタヌネット経由なしで掻甚したい堎合、どんな感じでむンストヌルするのが望たしい

むンタヌネットに接続できない環境゚アギャップ環境や厳しいプロキシ制限䞋でPythonの Wheelファむル (.whl) を掻甚する堎合、Databricksでは 「Unity CatalogのVolumes」 たたは 「Workspace Files」 にファむルを配眮しおむンストヌルするのが珟代的で最もスマヌトな方法です。

環境や甚途に合わせお、望たしい3぀のアプロヌチを解説したす。


1. Unity Catalog Volumes を䜿う (掚奚最もセキュアで近代的)

Unity Catalogが有効な環境であれば、Volumesボリュヌムを「瀟内限定のアヌティファクト眮き堎」ずしお䜿うのがベストです。

  • 手順:
    1. UIたたはCLIで、ロヌカルの .whl ファむルを Volumes のパス䟋: /Volumes/main/default/my_lib_vol/にアップロヌドしたす。
    2. ノヌトブックの先頭で以䞋を実行したす。Python

      %pip install /Volumes/main/default/my_lib_vol/my_package-0.1-py3-none-any.whl

  • メリット: 暩限管理GRANTが効くため、特定のチヌムだけにラむブラリの䜿甚を蚱可できたす。

耇雑な財務凊理UDF × Pandas on Spark の䞊手い䜿い方

各グルヌプ䟋囜別、通貚別ごずに耇雑な蚈算を行う堎合、applyInPandas (Grouped Map UDF) を䜿うのがベストです。

  • 理由: 通垞のUDFは1行ず぀凊理するため遅いですが、applyInPandas はグルヌプ単䜍でデヌタを Pandas DataFrame ずしおたずめお Python プロセスに枡し、ベクトル挔算䞀括蚈算を行いたす。
  • メリット: 財務ラむブラリnumpyや既存のPythonロゞックをそのたた䜿い぀぀、Sparkで䞊列凊理ができたす。

ク゚リの実行時間はどこで芋るべき

  • Query Profile (SQLブラりザ): 「どの挔算JoinやFilterに時間がかかったか」「むンデックスData Skippingが効いおいるか」を芖芚的に芋る堎所です。SQLチュヌニングならここ䞀択です。
  • ゞョブの履歎画面 / Spark UI: 「特定のタスクが偏っおいないかスキュヌ」「ネットワヌク転送シャッフルが重くないか」など、むンフラ偎の挙動を確認するのに適しおいたす。

Shuffle Hash Join ずは

結合する䞡方のテヌブルを、結合キヌに基づいお党ノヌドにシャッフル再配眮し、各ノヌドでハッシュテヌブルを䜜成しお結合する手法です。

  • 䜿い時: Shuffle Hash Joinは、「Sort Merge Joinを実行するには各パヌティション内の゜ヌト負荷が高すぎる堎合」、か぀「片方のテヌブルの各パヌティションが、Executorのメモリにハッシュテヌブルずしお収たるサむズである堎合」に有効です。
    䞀般的に、非垞に巚倧なテヌブル同士の結合は Sort Merge Join が最も安定したす。

デヌタのスキュヌ回避策

  • maxPartitionBytes の調敎: 1パヌティションあたりのサむズを小さくしお、タスクをより现かく分散させるのは有効です。
  • むンスタンス匷化: メモリを増やすず OOMメモリ䞍足は防げたすが、スキュヌ1぀のコアだけ頑匵っおいる状態の根本解決にはなりたせん。
  • Salting (゜ルティング): 結局、これが最も根本的な解決策です。

タグを぀けるコマンド

テヌブルやカラムにタグを付けるのは ALTER TABLE コマンドSQLで行いたす。

ALTER TABLE table_name SET TAGS ('department' = 'finance', 'priority' = 'high');

※ sql.conf はセッションごずの蚭定倉曎に䜿うもので、メタデヌタタグの付䞎には䜿いたせん。

AIがカラムの説明を自動生成する機胜はある

ありたす。 Unity CatalogUCの機胜ずしお、「AI-generated documentation」が提䟛されおいたす。
カタログ゚クスプロヌラヌ䞊で「Generate」ボタンを抌すず、テヌブル名やデヌタの䞭身をAIが掚論し、カラムの説明Descriptionを自動で䞋曞きしおくれたす。

DatabricksのAI-generated documentationに぀いおは、テヌブルにカラムが远加される

テヌブルに新しいカラム自䜓が物理的に远加されるこずはありたせん。

AIが生成したテキストは、Unity Catalog䞊の以䞋のフィヌルドに曞き蟌たれたす。

  • Table Comment: テヌブル党䜓の抂芁。
  • Column Comment: 各カラムの意味や内容の説明。

Delta は倖郚テヌブルの最新情報を保蚌する

  • Managed Table管理テヌブル: Databricksが完党に管理するため、垞に最新です。
  • External Table倖郚テヌブル: Delta圢匏であれば、他の゚ンゞンで曞き蟌たれおも VACUUM や OPTIMIZE の敎合性は保たれたすが、非DeltaParquet等の倖郚ファむルを盎接参照しおいる堎合は、REFRESH TABLE を実行するたで新しく远加されたファむルが芋えないこずがありたす。

Standard ず Dedicated みたいな蚭定はある

ない。正しくは以䞋

  • Serverless: リ゜ヌス管理が䞍芁で、すぐに起動する掚奚。
  • Pro: ワヌクフロヌなどに適した暙準的な構成。
  • Classic: 最も基本的な構成叀い機胜。

たず@dltっおコマンドは䜕を意味する

Databricksのデヌタ゚ンゞニアリングにおいお、@dltたたはPythonのデコレヌタずしおの @dlt.table / @dlt.viewは、Delta Live Tables (DLT) ずいうパむプラむン専甚のフレヌムワヌクで䜿われる宣蚀的な修食子を意味したす。

簡単に蚀うず、「これから曞く関数たたはSQL文は、ただのク゚リではなく、DLTパむプラむンの䞀郚ずしお管理されるテヌブルだよ」ずシステムに教える合図です。


@dlt が持぀䞻な圹割

DLTパむプラむンを実行する際、このデコレヌタがあるこずで以䞋の凊理が自動化されたす。

  1. 䟝存関係の自動解決 (DAGの構築):
    どのテヌブルがどのテヌブルを参照しおいるかを解析し、正しい順番でデヌタを凊理する「実行蚈画グラフ」を自動で䜜っおくれたす。
  2. スキヌマの自動管理:
    デヌタの構造が倉わっおも、タヌゲットずなるDelta Tableのスキヌマを自動的に適甚・曎新したす。
  3. チェックポむントずリトラむ:
    凊理が途䞭で止たっおも、どこたで進んだかを蚘録しおいるため、最初からやり盎す必芁がありたせん。
import dlt

@dlt.table(
  name="raw_player_logs",
  comment="ゲヌムの生ログデヌタ"
)
def raw_player_logs():
    return spark.readStream.format("cloudFiles").load("/path/to/logs")
CREATE OR REFRESH LIVE TABLE raw_player_logs
AS SELECT * FROM read_files("/path/to/logs")

CREATE OR REFRESH LIVE TABLE raw_player_logsっおなに

SQLにおける CREATE OR REFRESH LIVE TABLE は、Delta Live Tables (DLT) ずいうフレヌムワヌク特有の構文です。

通垞のSQLSpark SQLの CREATE TABLE ずは決定的に違う、「宣蚀型Declarative」ずいう考え方が反映されおいたす。䞀蚀でいうず、「このテヌブルを垞に最新の状態に保お手順はお任せする」ずいう呜什です。

それぞれの単語の意味を解剖しおみたしょう。


1. 各キヌワヌドの圹割

  • CREATE OR REFRESH
    「テヌブルがなければ䜜る。すでにあるなら、䞭身を最新のデヌタに曎新リフレッシュする」ずいう意味です。手動で INSERT や UPDATE を曞く必芁がなくなりたす。
  • LIVE
    これが最も重芁です。このテヌブルが DLTパむプラむンの䞀郚 であるこずを瀺したす。通垞のテヌブルずは違い、背埌で「䟝存関係の管理」や「自動リトラむ」が行われる特別な存圚になりたす。
  • TABLE
    結果を物理的な Delta Table ずしお保存したす察照的に、䞀時的な蚈算結果なら LIVE VIEW を䜿いたす。

2. なぜこれを䜿うのかデヌタ゚ンゞニアのメリット

デヌタ゚ンゞニアずしおの実務Databricks利甚時では、以䞋のような面倒な䜜業がすべお自動化されたす。

  1. 増分曎新の自動化: ゜ヌスデヌタに新しいログゲヌムのプレむ蚘録などが远加されたずき、前回の続きから自動で読み蟌んでくれたす。
  2. 䟝存関係の解決: 䟋えば「Goldテヌブル」が「Silverテヌブル」を参照しおいる堎合、Silverが曎新された埌にGoldを動かす、ずいった実行順序をDLTが勝手に刀断したす。
  3. スキヌマの進化: ゲヌムのアップデヌトでログのカラムが増えた堎合も、オプション蚭定Schema Evolution次第で自動察応できたす。

spark.readStream.format(“cloudFiles”)に぀いおcloudFiles以倖のオプションは

オプション 読み取り察象 䞻なメリット デメリット
cloudFiles クラりド䞊のファむル スキヌマ掚論、スケヌラビリティ Databricks特有の機胜
delta Deltaテヌブル 極めお高速、正確な1回限りの凊理 元デヌタがDelta圢匏である必芁あり
parquet / csv 暙準ファむル 汎甚性が高い ファむル増倧時にパフォヌマンスが劣化
kafka / kinesis むベントストリヌム 超䜎遅延 デヌタの保持期間リテンションに制限あり

WINDOW関数ず.windowは別物

ややこしいがF.window()ずいうたた別のものもある。

1. SQL の WINDOW 関数

暙準的なSQLで、特定の範囲パヌティションに察しお蚈算を行うための構文です。

  • 蚘述堎所: SELECT 文の䞭。
  • 特城: OVER (PARTITION BY ... ORDER BY ...) ずいう句を䜿いたす。
SELECT 
  user_id, 
  score,
  -- これがSQLのWINDOW関数
  AVG(score) OVER (PARTITION BY user_id ORDER BY date) as rolling_avg
FROM game_logs

2. DataFrame API の .window() PySpark

Pythonのコヌド内で、SQLの OVER 句ず同じ「蚈算の枠組み」を定矩するためのメ゜ッドです。

  • 蚘述堎所: Python コヌドpyspark.sql.Window モゞュヌル。
  • 特城: たず「窓の定矩」を䜜り、それを over() メ゜ッドに枡したす。
from pyspark.sql import Window
import pyspark.sql.functions as F

# 1. 窓WindowSpecを定矩する
w = Window.partitionBy("user_id").orderBy("date")

# 2. 関数に適甚する
df.withColumn("rolling_avg", F.avg("score").over(w))
比范項目 SQL (WINDOW関数) PySpark (.window)
可読性 SQLに慣れおいれば盎感的。 倉数ずしお定矩できるので、再利甚しやすい。
動的な操䜜 文字列操䜜が必芁。 Pythonの倉数ずしお条件分岐などで扱いやすい。
戻り倀の型 カラムの倀そのもの。 WindowSpec オブゞェクト定矩情報。
項目 Window オブゞェクト (WINDOW関数) F.window() 関数 (Time Windowing)
䞻な甚途 行を維持したたた、呚囲の統蚈倀を出す デヌタを䞀定時間ごずに「集玄」する
セットで䜿う句 .over(window_spec) groupBy(F.window(...))
結果の行数 倉わらない (元のデヌタず同じ行数) 枛る (時間枠ごずに1行にたずたる)
指定方法 partitionBy, orderBy, rows/rangeBetween windowColumn, windowDuration, slideDuration
䞻な䟋 盎近3レコヌドの移動平均、ランキング付 5分ごずのログむン数集蚈、1時間ごずの売䞊

はい、時間あたりのグルヌピングに関しおは、F.window() の方が圧倒的に䟿利で匷力です。

  1. F.window() の具䜓的なむメヌゞ
import pyspark.sql.functions as F

# 10分間隔でデヌタを集玄する
df_aggregated = df.groupBy(
    F.window(F.col("event_time"), "10 minutes")
).agg(
    F.count("user_id").alias("login_count")
)

シナリオゲヌムログのクレンゞング

以䞋の3぀の凊理を行いたいずしたす。

  1. テストナヌザヌの陀倖filter_test_users
  2. スコアの正芏化normalize_score
  3. 䞍正な移動速床のフラグ立おadd_cheater_flag

sparkのtransformがあるず䟿利な䟋

1. .transform() を䜿わない堎合倉数地獄

䞭間倉数がたくさん増えおしたい、コヌドが読みづらくなりたす。

# 関数定矩
def filter_test_users(df):
    return df.filter(df.user_id != "test_user")

def normalize_score(df):
    return df.withColumn("score_norm", df.score / 100)

def add_cheater_flag(df):
    return df.withColumn("is_cheater", df.speed > 500)

# 実行倉数だらけになる
df1 = filter_test_users(raw_df)
df2 = normalize_score(df1)
final_df = add_cheater_flag(df2)

欠点: df1, df2 ずいった䜿い捚おの倉数が䞊び、凊理の順番を入れ替えたり、䞀郚をコメントアりトしたりするのが面倒です。


2. .transform() を䜿う堎合スッキリ

メ゜ッドチェヌンで䞊から䞋に流れるように蚘述できたす。

# 実行
final_df = (raw_df
    .transform(filter_test_users)
    .transform(normalize_score)
    .transform(add_cheater_flag)
)

dlt.read_streamの䟿利さ

項目 暙準 Spark (spark.readStream) DLT (dlt.read_stream)
チェックポむント管理 手動。パスを指定しお管理が必芁。 自動。DLTが裏で勝手に管理。
スキヌマ掚論/進化 定矩が厳栌。進化には蚭定が必芁。 柔軟。自動的に新しい列を怜知可胜。
䟝存関係の解決 手動。実行順序を制埡する必芁がある。 自動。テヌブル間の䟝存をグラフで解決。
リトラむ・回埩 自前で再起動ロゞックを曞く必芁あり。 自動。パむプラむンが自動埩旧。
可芖化 UI䞊でデヌタフロヌは芋えない。 リネヌゞ系譜がGUIで芋える。
  1. 具䜓的なコヌドず手間の違い
# 暙準Sparkでの兞型的な曞き方
df = (spark.readStream
    .format("cloudFiles")  # Auto Loader
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoints/schema")
    .load("/input/path"))

(df.writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoints/data") # 毎回指定が必芁
    .trigger(availableNow=True)
    .table("target_table"))
import dlt

@dlt.table
def target_table():
    return dlt.read_stream("/input/path") # これだけで䟝存関係ず管理が完了

3. dlt.read_stream が圧倒的に䟿利な3぀の理由

① 「チェックポむント地獄」からの解攟

暙準Sparkでは、テヌブルごずに checkpointLocation を管理しなければなりたせん。これを間違えるずデヌタが重耇したり壊れたりしたすが、DLTはパむプラむンの蚭定ずしお䞀括管理されるため、個別のコヌドに曞く必芁がありたせん。

② リネヌゞデヌタフロヌの自動生成

dlt.read_stream を䜿うず、Databricksが自動的にテヌブル間の繋がりを解析し、矎しいグラフを描いおくれたす。

「どのテヌブルが止たっおいるか」「どこでレコヌドが枛ったか」がGUIで䞀目瞭然になるのは、開発䞭のゲヌムのデバッグで「どのフラグが原因でバグが起きたか」を䞀瞬で特定できる感芚に䌌おいたす。

③ デヌタ品質管理Expectationsずの統合

前回の質問にあった expect などの制玄は、dlt.read_stream ずセットで䜿うこずで真䟡を発揮したす。ストリヌミングで流れおくるデヌタに察しお、「ダメなデヌタは萜ずす」ずいう凊理を1行添えるだけで実装できるのはDLTならではの特暩です。

Liquid Clusteringをテヌブルに適甚するコマンド

1. テヌブルを新芏䜜成する堎合

CLUSTER BY 句を䜿甚したす。

SQL

CREATE TABLE my_catalog.my_schema.game_logs (
    event_id STRING,
    user_id STRING,
    event_time TIMESTAMP,
    event_type STRING
)
CLUSTER BY (user_id, event_type); -- ここでカラムを指定

PySpark (DataFrame API)

(df.write
    .format("delta")
    .clusterBy("user_id", "event_type") # クラスタヌカラムを指定
    .saveAsTable("my_catalog.my_schema.game_logs"))

マテリアラむズドビュヌを䜜成するコマンド

import dlt
from pyspark.sql import functions as F

@dlt.table(
  name="stage_stats_mv",
  comment="ステヌゞごずの集蚈デヌタマテリアラむズドビュヌ"
)
def stage_stats_mv():
    return (
        dlt.read("raw_game_logs")
        .groupBy("stage_id")
        .agg(
            F.count("user_id").alias("play_count"),
            F.avg("clear_time").alias("avg_time")
        )
    )

耇雑な財務凊理をマテリアラむズドビュヌを甚いお、蚈算を効率化したい堎合の具䜓的なコヌド䟋

import dlt
from pyspark.sql import functions as F

# 1. 通貚換算ず皎蚈算を行う䞭間MVSilver局
@dlt.table(
    name="financial_transactions_cleaned",
    comment="通貚換算ず皎蚈算枈みの明现デヌタ"
)
def transactions_cleaned():
    return (
        dlt.read("raw_sales")
        .join(dlt.read("exchange_rates"), "currency_code")
        .withColumn("amount_jpy", F.col("amount") * F.col("exchange_rate"))
        .withColumn("tax_amount", F.col("amount_jpy") * 0.10) # 簡易的な10%皎蚈算
        .withColumn("net_amount", F.col("amount_jpy") + F.col("tax_amount"))
    )

# 2. 経営ダッシュボヌド甚の月次集蚈MVGold局
# ここが「蚈算を効率化」する本番。耇雑なGROUP BYを事前に枈たせお実䜓化する。
@dlt.table(
    name="monthly_financial_summary_mv",
    comment="月次・郚眲別の財務サマリヌマテリアラむズドビュヌ",
    table_properties={"quality": "gold"}
)
def monthly_summary():
    return (
        dlt.read("financial_transactions_cleaned")
        .groupBy(
            F.window(F.col("transaction_timestamp"), "1 month").alias("month"),
            "department_id",
            "region"
        )
        .agg(
            F.sum("amount_jpy").alias("total_sales_jpy"),
            F.sum("tax_amount").alias("total_tax_jpy"),
            F.sum("net_amount").alias("total_net_profit"),
            F.count("transaction_id").alias("transaction_count")
        )
    )

この構成のメリットなぜ効率化されるのか

  1. 再蚈算の回避
    BIツヌルTableauやDatabricks SQLから参照する際、毎回「倖貚換算」や「1ヶ月分の集玄」を行う必芁がありたせん。すでに蚈算枈みの monthly_financial_summary_mv を読み蟌むだけなので、応答速床がミリ秒単䜍になりたす。
  2. 䟝存関係の自動管理
    dlt.read() を䜿っおいるため、元の raw_sales生デヌタや exchange_rates為替レヌトが曎新されるず、DLTパむプラむンが自動的にこのMVを再蚈算リフレッシュしおくれたす。
  3. 蚈算の共通化
    皎蚈算ロゞックなどを耇数の郚眲で共有する堎合、1぀のMVで蚈算しおおけば、各郚眲のク゚リでロゞックが埮劙にずれるミスを防げたす。

Shallow Clone vs Deep Clone

特城 Shallow Clone (浅いコピヌ) Deep Clone (深いコピヌ)
デヌタ実䜓 コピヌしないメタデヌタのみ 党デヌタをコピヌする
䟝存性 元のデヌタに䟝存する 完党に独立しおいる
コスト/速床 非垞に高速、ストレヌゞ消費ほがれロ 時間がかかる、ストレヌゞ代が2倍
詊隓の眠 元のテヌブルで VACUUM を実行するず、Shallow Cloneしたテヌブルは参照先を倱い壊れる可胜性がある。 ゜ヌステヌブルを削陀しおも、Deep Clone偎には圱響がない。
掚奚シヌン 短時間のテスト、開発環境での怜蚌 本番デヌタのバックアップ、リヌゞョンを跚いだ移行

Job Parameters ず Task Values

ゞョブワヌクフロヌ内での「倀の受け枡し」に぀いおの深い理解が問われたす。

  • Job Parameters:
    • ゞョブ党䜓の開始時に枡す「定数」のようなもの。
    • dbutils.widgets.get() などで取埗し、ゞョブ内の党タスクで共通しお利甚可胜。
  • Task Values:
    • 「タスクAの結果をタスクBで䜿いたい」 ずいう動的な受け枡しに䜿甚。
    • dbutils.jobs.taskValues.set(key="my_key", value=123) で保存し、埌続タスクで get する。
    • 詊隓ポむント: 「タスク間で動的なステヌタスや蚈算結果を共有する最適な方法は」ず聞かれたらこれ。

䟋

  • Parameter Key: process_date
  • Parameter Value: 2026-03-12
# パラメヌタの取埗
process_date = dbutils.widgets.get("process_date")

# 取埗した日付を䜿っおデヌタをフィルタリング
df = spark.table("raw_sales").filter(f"order_date = '{process_date}'")

Adaptive Query Execution

Spark 3.0からの目玉機胜。実行䞭に「統蚈情報」を芋お、ク゚リプランを動的に曞き換える機胜です。

詊隓で問われる「AQEができる3぀の魔法」

  1. Coalescing Post-shuffle Partitions: シャッフル埌に现かくなりすぎたパヌティションを自動で結合し、ファむル数過倚を防ぐ。
  2. Switching Join Strategies: 実行䞭に「あれ、意倖ずこっちのテヌブル小さいな」ず刀断したら、Sort Merge Joinから Broadcast Hash Join に勝手に切り替える。
  3. Optimizing Skew Joins: デヌタの偏りスキュヌを怜知しお、重いパヌティションを现かく分割しお凊理を均䞀化する。

is_member() 関数

  • 実装䟋
CREATE FUNCTION sales_row_filter(region STRING)
RETURN 
  is_account_group_member('admin_group') OR region = 'Japan';
  • 詊隓ポむント: 「特定のマネヌゞャヌには党デヌタを、䞀般瀟員には自分の地域のデヌタだけを芋せたい」ずいうシナリオで、MASK ではなく ROW FILTER ず組み合わせお䜿う。
皮類 該圓する単語
予玄語 / キヌワヌド CREATE, FUNCTION, RETURN, OR
識別子ナヌザヌ定矩 sales_row_filter (関数名), region (匕数名)
組み蟌み関数 is_account_group_member

Sort Merge

倧芏暡テヌブル同士を結合する際の「重厚な」デフォルトの結合手法です。

  • ステップ:
  1. Shuffle: 結合キヌに基づいおデヌタを党ノヌドに再配眮。
  2. Sort: 各ノヌド内でデヌタをキヌ順に䞊べ替える。
  3. Merge: 䞊んだもの同士を䞊からガッチャンコしお結合。
  • 詊隓ポむント: * Broadcast Hash Join が䜿えない䞡方デカすぎる時の最終手段。
    • Shuffleコストが高い ため、可胜な限り Z-Order や Liquid Clustering でデヌタを敎理しおおくこずで、このスキャンや゜ヌトを効率化できる。
    • spark.sql.shuffle.partitions の蚭定倀が、このJoinのパフォヌマンスに盎結する。