蚘事に぀いお

最近Databricks案件が増えおいるので、
改めお孊習しおいる䞭で明確に知らない知識を敎理しおみたした。
怜定詊隓などで䜿える知識もあるず思うので、
興味のある方はご掻甚ください。

蚘事抂芁

本文では以䞋の点に぀いお觊れたす

💻 開発環境ずむンタヌフェヌス

  • ラむブラリ管理
    • %pip マゞックコマンドを䜿甚しお、ノヌトブック単䜍のスコヌプでむンストヌル。
  • ノヌトブックの識別
    • 最初の行に蚘述される特殊なヘッダヌ# Databricks notebook source。
  • ストレヌゞの違い
    • DBFS ルヌト: ワヌクスペヌス専甚の䞀時的な保存堎所。
    • 倖郚マりントMount: ナヌザヌが管理する S3/ADLS などの倖郚ストレヌゞ。
  • 察話型ツヌルりィゞェット
    • text, dropdown, combobox, multiselect の4皮類でパラメヌタヌを動的に受け取り。
  • ビュヌの䜜成
    • ゚むリアスASを蚭定するこずで、元のデヌタ構造を維持し぀぀、利甚者向けの分かりやすい呜名を提䟛。
  • セキュリティ
    • Secrets秘密情報の取り扱い反埩凊理などで䞍甚意に露出させない運甚。

⚡ パフォヌマンスずアヌキテクチャ

  • Spark UIの掻甚
    • 「Stages」や「Storage」タブからディスクぞの曞き蟌み状況やキャッシュ状態を確認。
  • 凊理の効率化
    • %sh単䞀ノヌドの限界を理解し、倧芏暡デヌタはSparkによる分散凊理ぞ移行する。
  • 最適化技術述語プッシュダりンPredicate Pushdown
    • 物理プランの段階でフィルタリングを適甚し、読み蟌むデヌタ量を最小化。
  • 結合の高速化broadcast()
    • 小さいテヌブルを党ノヌドに配垃するこずで、コストの高い「シャッフル」を回避。
  • スキュヌ歪み察策
    • 特定パヌティションぞのデヌタ偏りData Skewを特定し、凊理のボトルネックを解消。
  • クラスタヌタむプHigh Concurrency高䞊列クラスタヌ
    • 耇数ナヌザヌによる同時ク゚リ実行に特化し、リ゜ヌスの分離ず最適化を行う。

🔄 ストリヌミングずCDC倉曎デヌタ捕捉

  • Structured Streaming
    • 無限に増え続けるテヌブルUnbounded Tableずしおデヌタを扱うプログラミングモデル。
  • 状態管理checkpointLocation
    • ク゚リの進行状況を蚘録。スキヌマ倉曎時には新しいパスを指定する必芁がある。
  • ストリヌム間結合
    • タむムスタンプ等をキヌに、䞀臎する盞方が届くたで状態Stateを保持・远跡。
  • 倉曎デヌタフィヌドChange Data Feed: CDF
    • 挿入・曎新・削陀の「差分」を抜出。Time Travel断面の埩元ずの䜿い分け。
  • CDCの自動凊理
    • 倖郚システムからの倉曎ログを怜知し、APPLY CHANGES INTO 等でDeltaテヌブルぞ反映。

📊ゞョブ管理ずガバナンス

  • 実行履歎の保持
    • ゞョブの実行結果は、デフォルトで 60日間 たたは 5,000ä»¶ たで保持。
  • API掻甚Jobs API
    • GET /api/2.1/jobs/get を䜿甚しお、マルチタスクゞョブの詳现や構成を確認。
  • デヌタ保護PIIデヌタ察策
    • パヌティション分割ずACLアクセス制埡を組み合わせ、物理的な境界を利甚した削陀・保護の最適化。
  • デヌタ品質管理
    • パむプラむン倖の管理甚テヌブルで品質ルヌルを䞀括保持し、動的なバリデヌションを実珟。
  • Delta Live Tables (DLT)
    • パむプラむンの自動化ず pipelines.reset.allowed プロパティによる再蚈算の制埡。

以䞋蚘事本文

最適化された曞き蟌みずは

  • Databricksの「最適化された曞き蟌みOptimized Writes」は、Delta Lakeテヌブルぞのデヌタ曞き蟌み時に、倚数の小さなファむルが生成される「小ファむル問題Small File Problem」を自動的に回避し、最適なファむルサむズ通垞128MBに統合しお曞き蟌む機胜です
  • 倚数の小さなファむルにどのような問題がある
    • メタデヌタ管理のオヌバヌヘッド
      • ファむルシステムS3やAzure Data Lakeなどは、ファむルを開くたびに「どこにデヌタがあるか」を確認するメタデヌタぞのアクセスが発生したす。
      • 1GBのファむルが1枚 メタデヌタの読み取りは1回。
      • 1KBのファむルが100䞇枚 メタデヌタの読み取りが100䞇回発生し、それだけで数分かかるこずもありたす。
    • I/O入出力の非効率
      • ディスクやネットワヌクからの読み蟌みには、準備運動シヌクタむムや接続確立が必芁です。小さなファむルばかりだず、「デヌタの読み蟌み時間」よりも「ファむルを開閉する時間」の方が長くなっおしたいたす。
    • 䞊列凊理の限界

ノヌトブック レベルのスコヌプを持぀ Python パッケヌゞをむンストヌルする方法

  • Databricksにおいお、特定のノヌトブック内でのみ有効なPythonラむブラリをむンストヌルするには、%pip マゞックコマンドを䜿甚するのが暙準的か぀最も掚奚される方法です。

デフォルトのデヌタ保持閟倀の時間は䜕日

  • 7日間
    • Delta Lakeで叀いファむル削陀枈みたたは曎新前のデヌタをクリヌンアップする VACUUM コマンドに関連するデフォルトの保持期間は 7日間 です。

パヌティションがディスクに曞き蟌たれおいるこずを瀺す䞻な指暙を提䟛する Spark UI の 2 ぀の堎所はどこ

  • Stages タブおよび Stage 詳现ペヌゞ
    • 特定のステヌゞを遞択するず衚瀺される「Summary Metrics」や「Tasks」のテヌブルで、以䞋の項目を確認できたす。
  • Storage タブ
    • キャッシュ.persist() や .cache()されたデヌタセットの状態を確認する堎所です。
    • Storage Level: ここで DISK_ONLY や MEMORY_AND_DISK ず衚瀺されおいる堎合、デヌタがディスクに保持されおいるこずを瀺したす。
    • Size on Disk: キャッシュされたパヌティションのうち、実際にディスクに曞き蟌たれた容量を確認できたす。

以䞋のコマンドで、特定のパヌテヌションのディスク曞き蟌みが問題でないか確認できたす

SELECT 
    partition_column_name, 
    COUNT(*) as row_count,
    SUM(size_in_bytes) / 1024 / 1024 as size_mb -- ファむルサむズを取埗できるメタデヌタがある堎合
FROM 
    table_name
GROUP BY 
    1
ORDER BY 
    row_count DESC

Databricks Python ノヌトブックの最初の行は䜕になりたすか?

# Databricks notebook source
– この行は、Databricksがノヌトブックを゜ヌスファむル.pyファむルなどずしお゚クスポヌトしたり、倖郚のリポゞトリGitHubなどず同期したりする際に、 「このファむルはDatabricksのノヌトブック圢匏である」 こずを識別するための特殊なヘッダヌマゞックコメントです。
– DBFS ルヌト ストレヌゞず、dbutils.fs.mount() を䜿甚しおマりントされた倖郚オブゞェクト ストレヌゞの違いは
– Databricksにおいお、DBFSDatabricks File Systemルヌトず 倖郚マりントdbutils.fs.mount() は、どちらも dbfs:/ 圢匏のパスでアクセスできたすが、その実䜓ず運甚面には倧きな違いがありたす。

項目 DBFS ルヌト ストレヌゞ 倖郚オブゞェクト ストレヌゞ (Mount)
実䜓 Databricks ワヌクスペヌス䜜成時に自動生成される専甚の S3/ADLS バケット。 ナヌザヌが自身で管理する既存のクラりドストレヌゞS3, ADLS, GCS。
䞻な甚途 ラむブラリの保存、初期サンプルデヌタ、䞀時的なログ、Hive メタストアのデフォルト堎所。 本番デヌタ、倧芏暡なデヌタレむク、共有アセット。
セキュリティ ワヌクスペヌスの党ナヌザヌが読み曞き可胜詳现な暩限管理が困難。 クラりド偎のIAMロヌルやサヌビスプリンシパルで詳现なアクセス制埡が可胜。
デヌタの氞続性 ワヌクスペヌスを削陀するず、このストレヌゞも原則削陀される。 ワヌクスペヌスを削陀しおも、ストレヌゞ内のデヌタはそのたた残る。
掚奚床 䜎機密デヌタの保存は非掚奚。 高デヌタガバナンスの芳点から掚奚。

%sh䜿うず玄 1 GB のデヌタを抜出しおロヌドするのに 20 分以䞊かかる可胜性あるどうすれば察応できる

  • なぜ %sh は遅いのか20分以䞊かかる理由
    • 単䞀ノヌドの限界
      • %sh はドラむバヌノヌド芪玉のPC1台だけで動䜜したす。Databricks の最倧の匷みである「耇数台の分散凊理Spark」を䞀切䜿っおいたせん。
    • メモリ䞍足ずディスクI/O
      • 1GBのデヌタをドラむバのロヌカルメモリやディスクで凊理しようずするず、リ゜ヌスが枯枇し、スワップ凊理の停滞が発生したす。
    • ネットワヌクのボトルネック
      • クラりドストレヌゞS3/ADLSずの通信を、最適化されおいない暙準的なシェル経由で行うため、䞊列転送ができず非垞に時間がかかりたす。
  • 掚奚される察応策20分かかる凊理を 数秒〜数分 に短瞮するためのステップです。
    • %pip でラむブラリを入れ、Spark/Pythonで曞く
    • もし %sh で curl や wget を䜿っおデヌタを取埗しおいるなら、Pythonのラむブラリや Spark を盎接䜿いたしょう。
    • B. dbutils.fs (cp/mv) を掻甚する
      DBFSやマりント枈みストレヌゞ間でデヌタを移動させるなら、シェルコマンドの cp ではなく、Databricks専甚のナヌティリティを䜿いたす。これはバックグラりンドで最適化されおいたす。

Spark Structured Streaming で䜿甚される䞀般的なプログラミング モデルの特城は

    1. 無限に増え続けるテヌブル (Unbounded Table)
    • Structured Streamingでは、ストリヌミングデヌタを「静的なテヌブル」ずしお扱いたす。
    • 新しいデヌタが到着するたびに、そのデヌタは テヌブルの新しい行ずしお末尟に远加Append されおいくずいう考え方
    1. むンクリメンタル増分実行
      Sparkは、入力テヌブルに远加された新しいデヌタを自動的に怜出し、前回の実行からの差分増分だけを凊理したす。
    1. 出力モヌド (Output Modes)凊理結果を倖郚ストレヌゞやコン゜ヌルに曞き出す際、以䞋の3぀のモヌドから遞択できたす。
    • モヌド特城
      – Append (远加)新しく远加された行のみを出力する
      – 最も䞀般的。
      – Complete (完党)毎回、結果テヌブル党䜓を曞き出す
      – 集蚈凊理などで䜿甚。
      – Update (曎新)
      – 前回から倉曎があった行のみを曎新・出力する。
    1. 効率性
    • 毎回デヌタ党䜓を再蚈算するのではなく、新しく届いたデヌタのみを既存の結果に結合したり集蚈したりするため、䜎レむテンシで動䜜したす。

実行䟋

Structured Streamingを䜿甚するには、spark.readStream から始たり、最埌に writeStream で出力モヌドを指定するのが基本の流れです。

# 1. 無限に増え続けるテヌブルずしお読み蟌み
df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .load("/mnt/data/input_logs")

# 2. むンクリメンタルな倉換凊理
# (ここでは新しい行だけが自動的に凊理察象になる)
processed_df = df.filter("action = 'click'").groupBy("user_id").count()

# 3. 出力モヌドの指定
query = processed_df.writeStream \
    .outputMode("complete") \
    .format("delta") \
    .option("checkpointLocation", "/mnt/data/checkpoints") \
    .start("/mnt/data/output_table")

そうじゃないバッチ凊理コヌドの䟋

# readStream ではなく read を䜿う
df = spark.read \
    .format("delta") \
    .load("/mnt/data/user_logs") # 実行した瞬間に存圚するデヌタだけを読み蟌む

# 凊理を実行
result = df.filter("event = 'jump'")

# writeStream ではなく write を䜿う
# 1回曞き蟌んで終了新しいデヌタが来おも自動では動かない
result.write \
    .mode("overwrite") \
    .save("/mnt/data/processed_results")

マりントされたオブゞェクト ストレヌゞに Python Wheel をアップロヌドするには、どの Databricks CLI コマンドを䜿甚する必芁がありたすか?

  • マりントされたオブゞェクトストレヌゞDBFS䞊のマりントポむントに Python Wheel ファむル.whlをアップロヌドするには、Databricks CLI の databricks fs cp コマンドを䜿甚したす。

ゞョブ実行履歎の保持期間ずは䜕ですか?

  • Databricks における ゞョブ実行履歎の保持期間Job Run Retention ずは、実行されたゞョブの結果、ログ、および詳现な統蚈情報がシステム内に保存され、UIやAPIから確認できる期間のこずです。
    この期間を過ぎるず、過去の実行デヌタは自動的に削陀されたす。

デフォルトの保持期間

  • Databricks の暙準蚭定では、ゞョブの実行履歎は以䞋の条件で保持されたす。
    • 期間: 60日間
    • 件数制限: 同䞀ゞョブに぀き最倧 5,000ä»¶ たで

PIIデヌタはトピック フィヌルドごずにパヌティション分割し、ACL ず削陀ステヌトメントでパヌティション境界を掻甚できるようにする必芁がある

  • はい、その蚭蚈方針は非垞に理にかなっおおり、Databricksやデヌタレむクの運甚においお「ベストプラクティス」の䞀぀ず蚀えたす。
  • 䞻な理由は以䞋の3点です。
      1. 削陀ステヌトメントGDPR/改正個人情報保護法察応の最適化
      1. ACLアクセス制埡の簡玠化
      1. パヌティション・プルヌニングによる性胜向䞊

テヌブルの合蚈サむズが 10 TB を超えおいるにもかかわらず、ほずんどのファむルは 64 MB 未満である理由はどんなものがある

  • テヌブルの合蚈サむズが 10 TB もあるのに、個々のファむルが 64 MB 未満小さなファむルが倧量にある状態である堎合、デヌタプラットフォヌムのパフォヌマンスを著しく䜎䞋させる「Small File Problem小さなファむル問題」が発生しおいたす。
    1. 過剰なパヌティション化 (Over-partitioning)
      最も䞀般的な原因です。高カヌディナリティ倀の皮類が非垞に倚いカラムでパヌティションを分割するず、デヌタが现切れになりたす。
  • 䟋
    • 䟋えば user_id や timestamp秒単䜍などでパヌティションを分けるず、1぀のパヌティションフォルダあたりのデヌタ量が極端に少なくなりたす。

倉曎フィヌド機胜のメリットずよく䜿われるケヌスは

  • DatabricksDelta Lakeの 倉曎デヌタフィヌドChange Data Feed: CDF は、テヌブルに察しお行われた「挿入・曎新・削陀」の履歎を、倉曎前の倀ず埌の倀を含めお正確に远跡できる機胜です。
    単に最新のデヌタを取埗するだけでなく、「䜕がどう倉わったか」ずいう倉化のプロセスを掻甚できるのが最倧の匷みです。
  • 倉曎デヌタフィヌドCDFの䞻なメリット
    • 増分凊理の効率化
      • テヌブル党䜓をスキャンし盎す必芁がなく、前回の凊理以降の「倉曎分」だけをピンポむントで取埗できるため、コンピュヌトコストを倧幅に削枛できたす。
    • 倉曎皮別の刀別
      • そのデヌタが「新芏远加Insert」なのか「既存の曎新Update」なのか、あるいは「削陀Delete」されたのかを、システム列_change_typeによっお䞀目で刀別できたす。
    • ビフォヌ・アフタヌの把握
      • 曎新Updateの堎合、曎新前の倀update_preimageず曎新埌の倀update_postimageを同時に取埗できるため、数倀の差分蚈算などが容易になりたす。
  • よく䜿われるケヌスナヌスケヌス
    • メダリオン・アヌキテクチャでの䞋流ぞの䌝播
      Bronze生デヌタから Silverクレンゞング枈み、Gold集蚈枈みぞずデヌタを流す際、Silverテヌブルで発生した「叀いデヌタの削陀」や「過去デヌタの修正」を䞋流のGoldテヌブルに反映させるために䜿甚されたす。
    • マテリアラむズド・ビュヌの曎新
      耇雑な集蚈合蚈倀や平均倀などを行っおいるテヌブルにおいお、䞀郚のデヌタが倉わった際に、その差分だけを䜿っお集蚈結果を「むンクリメンタルに曎新」する堎合に非垞に有効です。
    • 倖郚システムぞの同期CDC
      Databricks内のデヌタが曎新されたこずをトリガヌに、倖郚の怜玢゚ンゞンElasticsearch等や基幹システム、通知サヌビスに「どのレコヌドがどう倉わったか」を正確に通知・同期する際に利甚されたす。
    • 監査ずコンプラむアンス
      「誰が、い぀、どの倀を、䜕から䜕ぞ曞き換えたか」ずいう詳现な監査ログずしお掻甚できたす。特にPII個人情報の倉曎履歎を远跡する必芁がある堎合に匷力な歊噚ずなりたす。

倉曎デヌタフィヌドの有効化の仕方

-- テヌブル䜜成時に有効化
CREATE TABLE student_scores (id INT, name STRING, score INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true);

-- 既存のテヌブルで有効化
ALTER TABLE student_scores SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

2. 倉曎履歎を読み取る方法

CDFが有効になるず、通垞のデヌタに加えお以䞋の4぀のメタデヌタ列が蚘録されたす。
– _change_type: 倉曎の皮類
– insert
– update_preimage
– update_postimage
– delete
– _commit_version
– 倉曎時のバヌゞョン番号
– _commit_timestamp
– 倉曎時のタむムスタンプ

-- バヌゞョン0から最新たでの倉曎履歎を取埗
SELECT * FROM table_changes('student_scores', 0);

-- 特定の期間の倉曎を取埗
SELECT * FROM table_changes('student_scores', '2026-03-01 00:00:00');

table_changes はCDFが有効なテヌブルに察しおのみ䜿える特別な関数です。

範囲指定のパタヌン
– バヌゞョン指定
– table_changes(‘my_table’, 10, 15)
– バヌゞョン10から15たでの倉曎。
– 最新たで
– table_changes(‘my_table’, 10)
– バヌゞョン10から珟圚たでの党倉曎。
– 時間指定
– table_changes(‘my_table’, ‘2026-03-01 10:00:00’)
– その時刻以降の倉曎。

Databricksりィゞェットずはどんなものがある

  • Databricksりィゞェットは、ノヌトブックをむンタラクティブなダッシュボヌドに倉えるための非垞に䟿利なツヌルです。
    䞀蚀で蚀えば、「コヌドを盎接曞き換えずに、画面䞊の入力フォヌムから倉数パラメヌタヌを枡す仕組み」のこずです。これを䜿うこずで、゚ンゞニア以倖の人でも倀を倉曎しお分析結果を再蚈算させるこずが可胜になりたす。

りィゞェットでできるこず

- 動的なフィルタリング
    - 日付や地域を遞択しおグラフを曎新する。
- 倉数の共通化
    - パむプラむン党䜓で䜿う蚭定倀ファむルパスなどを䞀箇所で管理する。
- ゞョブの匕数受け取り
    - 倖郚のワヌクフロヌAzure Data Factoryなどからパラメヌタヌを受け取っお実行する。

りィゞェットの皮類党4皮

りィゞェット名 特城 適した甚途
テキスト (text) 自由な文字列を入力できるボックス。 名前、ID、特定のキヌワヌド怜玢など。
ドロップダりン (dropdown) あらかじめ蚭定したリストから1぀を遞択。 郜道府県、郚眲名、ステヌタスON/OFFなど。
コンボボックス (combobox) リストから遞ぶか、自由入力も可胜。 候補はあるが、䟋倖的な倀も入れたい堎合。
マルチ遞択 (multiselect) リストから耇数の倀をチェックボックス圢匏で遞択。 耇数地域の同時分析、特定カテゎリの絞り蟌み。

りィゞェットの基本的な䜿い方Python䟋

りィゞェットを䜜成・操䜜するには、
dbutils.widgets
を䜿甚したす。

りィゞェットの䜜成

# ドロップダりンを䜜成する䟋
dbutils.widgets.dropdown("region", "Tokyo", ["Tokyo", "Osaka", "Nagoya"], "地域を遞択しおください")

# テキストボックスを䜜成する䟋
dbutils.widgets.text("user_id", "001", "ナヌザヌIDを入力")

倀の取埗

䜜成したりィゞェットに入力された倀をコヌド内で䜿うには get メ゜ッドを䜿いたす。

selected_region = dbutils.widgets.get("region")
print(f"遞択された地域は: {selected_region}")

りィゞェットの削陀

dbutils.widgets.remove("region") # 特定のものを削陀
dbutils.widgets.removeAll()      # 党お削陀

ビュヌを䜜成するメリットに぀いお

新しいテヌブルから遞択したフィヌルドに゚むリアスを蚭定するこずで、
元のデヌタスキヌマずテヌブル名を維持するビュヌを䜜成するメリットは

「物理的なデヌタ構造元テヌブルは倉えずに、利甚者に芋せる窓口ビュヌだけ、項目名を分かりやすく したり、特定の列だけに絞ったりしお提䟛する 」 ずいうこずを指したす。

具䜓的にどういうこずか解説したす。

元々のテヌブルにあるカラム名列名がシステム的な名前䟋: cust_id_01だった堎合、ビュヌを䜜成する際に人間が理解しやすい名前䟋: customer_id に倉換するケヌスです。

SQLのむメヌゞ:

CREATE VIEW sales_v AS
SELECT 
  cust_id_01 AS customer_id,  -- ゚むリアスを蚭定
  sls_amt AS sales_amount      -- ゚むリアスを蚭定
FROM 
  raw_data.sales_table;        -- 元のデヌタスキヌマずテヌブル名
  1. 「元のデヌタスキヌマずテヌブル名を維持する」の意味
    これは、 「元のデヌタ゜ヌスを䞀切いじらない」 ずいう点が重芁です。
    • デヌタの実䜓: 元のスキヌマ䟋: raw_dataにある元のテヌブル䟋: sales_tableの䞭に、元のカラム名のたた残り続けたす。
    • ビュヌの圹割: ビュヌはあくたで「定矩」なので、元のデヌタをコピヌするこずなく、参照する際の芋え方だけを倉えおいたす。
  2. なぜこれを行うのかメリット
    • ビゞネスフレンドリヌな呜名
      • ゚ンゞニアが䜜った耇雑な物理名のテヌブルを、アナリストが䜿いやすい「日本語名」や「暙準的な英語名」に倉えお提䟛できたす。
    • むンパクトの最小化
      • 物理テヌブルのカラム名がシステムの郜合で倉わっおも、ビュヌ偎で゚むリアスを調敎すれば、そのビュヌを䜿っおいるダッシュボヌドTableauやPower BIなどを修正せずに枈みたす。
    • 䞍芁な列の隠蔜
      • 元テヌブルには100列あっおも、ビュヌで特定の10列だけに゚むリアスを぀けお遞択SELECTすれば、利甚者は迷わずに枈みたす。
    • 互換性の維持
      • 叀いシステムで䜿っおいたカラム名を゚むリアスずしお維持するこずで、移行期間䞭のツヌルを壊さずに新しいテヌブル構造ぞ移行できたす。

Delta Lake は、ク゚リ フィルタヌに基づいおデヌタのスキップに掻甚される各テヌブルの最初の䜕列の統蚈を自動的に収集する

Delta LakeがデヌタスキップData Skippingのために自動的に統蚈情報を収集するのは、テヌブルの最初の32列 です。

32列の統蚈収集の仕組み
Delta Lakeは、テヌブルにデヌタを曞き蟌む際、各デヌタファむルParquetファむルに含たれる倀の最小倀 (minimum)、最倧倀 (maximum)、およびNull倀の数を蚘録したす。

SQLのDESCRIBE EXTENDEDコマンドずは

DatabricksおよびApache Spark SQLにおけるDESCRIBE EXTENDEDコマンドは、テヌブルの基本構造カラム名やデヌタ型に加えお、より詳现なメタデヌタ栌玍堎所やテヌブル圢匏などを衚瀺するためのコマンド です。

通垞のDESCRIBEたたはDESCではカラム情報しか衚瀺されたせんが、EXTENDEDを付けるこずで「このテヌブルがどこにあるのか」「倖郚テヌブルか管理テヌブルか」ずいった運甚に䞍可欠な情報を䞀気に確認 できたす。

取埗できる䞻な情報
このコマンドを実行するず、通垞のカラムリストの䞋に
Detailed Table Information
ずいうセクションが衚瀺され、以䞋の項目が確認できたす。

項目名 内容
Database テヌブルが所属するデヌタベヌススキヌマ名。
Table テヌブル名。
Owner テヌブルの所有者䜜成者。
Created Time 䜜成日時。
Last Access 最終アクセス日時。
Created By 䜜成したSparkのバヌゞョンなど。
Type MANAGED管理テヌブルか EXTERNAL倖郚テヌブルか。
Location 実際のデヌタファむルが保存されおいるクラりドストレヌゞS3/ADLS/GCSのパス。
Provider デヌタフォヌマットdelta, parquet, csv など。
Table Properties delta.minReaderVersion や delta.dataSkippingNumIndexedCols などのカスタム蚭定。

保存された秘密を反埩凊理しお各文字を出力するの反埩凊理ずは

具䜓的なむメヌゞPython䟋
䟋えば、秘密のパスワヌドが TopSecret だったずしたす。これを「反埩凊理しお各文字を出力する」コヌドは以䞋のようになりたす。

# 保存された秘密
secret = "TopSecret"

# 反埩凊理forルヌプ
for char in secret:
    print(char)

構造化ストリヌミングでは、なぜク゚リに倉曎を加える際、新しいcheckpointLocationを指定する必芁がある※新しいフィヌルドの远加やスキヌマの倉曎など

構造化ストリヌミングStructured Streamingにおいお、チェックポむントcheckpointLocationは、 「ク゚リの家蚈簿状態管理」 のような圹割を果たしおいたす。

ク゚リに砎壊的な倉曎新しい列の远加や集蚈ロゞックの倉曎を加えた際に、なぜ新しいパスを指定する必芁があるのか、その理由は 「䞍敎合矛盟」を防ぐため です。

  1. チェックポむントの䞭身
    チェックポむントディレクトリには、単なるデヌタの進行状況オフセットだけでなく、以䞋の重芁な情報が保存されおいたす。
    • Offset: どこたでデヌタを読み蟌んだか。
    • Commit: どのデヌタの曞き蟌みが完了したか。
    • Schema: 実行時のデヌタの構造。
    • State: 集蚈Window関数やGroup Byの途䞭の蚈算結果。
  2. なぜ同じ堎所を䜿い回せないのか
    もしスキヌマを倉曎したのに、叀いチェックポむントを䜿い回そうずするず、Sparkは以䞋のようなパニック゚ラヌを起こしたす。
    • スキヌマのミスマッチ
      • 「保存されおいる家蚈簿には『名前』ず『金額』しかないのに、新しいコヌドには『皎率』ずいう列がある。どう蚈算を再開しおいいか分からない」ずなりたす。
    • 状態Stateの非互換性
      • 前回の集蚈結果Stateが「2列構成」で保存されおいる堎合、新しい「3列構成」のコヌドでその続きを蚈算しようずするず、バむナリレベルでデヌタの䞍敎合が発生したす。
  3. チェックポむントを新しくするタむミング
    以䞋のような倉曎を行う堎合は、基本的に 新しい checkpointLocation を指定するたたは既存のフォルダを削陀する 必芁がありたす。
倉曎内容 理由
列の远加・削陀 保存されおいるスキヌマ情報ず䞀臎しなくなるため。
集蚈キヌの倉曎 内郚で保持しおいる「状態State」の構造が倉わるため。
UDFナヌザ定矩関数の倉曎 ロゞックが倉わるず、䞭間結果ずの敎合性が取れなくなるため。
出力モヌドの倉曎 append から complete ぞの倉曎などは管理方法が異なるため。

テヌブルスキヌマを曎新するには、远加するフィヌルドごずにデフォルト倀を指定しない堎合どうなる

  • 既存の行には「NULL」が入る

ストリヌミングデヌタフレヌムず静的テヌブルの結合コマンド䟋は

静的テヌブルが右偎にくるように結合するのが䞀般的です

Databricks High Concurrency クラスタヌずは

Databricksの High Concurrency高䞊列凊理クラスタヌずは、耇数のナヌザヌが同時にリ゜ヌスを共有しおク゚リを実行するこずに特化したクラスタヌタむプです。

䞻に、デヌタサむ゚ンティストやアナリストがチヌムで1぀のクラスタヌを䜿い、SQL、Python、Rなどでむンタラクティブに分析を行う「共有環境」ずしお蚭蚈されおいたす。

  1. 䞻な特城ずメリット
    High Concurrencyクラスタヌが「暙準Standard」クラスタヌず異なる点は、以䞋の通りです。
特城 内容
リ゜ヌスの最適化 ク゚リごずにリ゜ヌスを现かく割り圓お、1人の重いク゚リがクラスタヌ党䜓を占有するのを防ぎたす。
ナヌザヌ分離 (Isolation) 耇数のナヌザヌが同じクラスタヌで䜜業しおも、お互いの倉数や関数が干枉しないようプロセスが分離されおいたす。
オヌトスケヌリング 実行埅ちのク゚リが増えるず自動的にワヌカヌノヌドを远加し、負荷が枛るず削陀しおコストを抑えたす。
倚蚀語サポヌト SQL、Python、R をサポヌトしおいたすが、Scala はアヌキテクチャ䞊の制限によりサポヌトされおいたせん。
  1. High Concurrency ず Standard の比范
    甚途に合わせおどちらを遞ぶべきか、以䞋のテヌブルで比范できたす。
項目 High Concurrency (高䞊列) Standard (暙準)
䞻な甚途 耇数人でのむンタラクティブな分析、BIツヌル接続 1人での開発、耇雑なETLゞョブの実行
同時実行性 非垞に高い倚数の小さなク゚リに最適 䜎い1぀の倧きな凊理にリ゜ヌスを集䞭
サポヌト蚀語 SQL, Python, R SQL, Python, R, Scala
セキュリティ テヌブルレベルのアクセス制埡が可胜 ナヌザヌ間での完党な分離は限定的

マルチタスク ゞョブでタスクずしお実行するように構成されたノヌトブックを確認するには、どんな REST API 呌び出しが必芁

マルチタスク ゞョブにおいお、どのノヌトブックがどのタスクに割り圓おられおいるかを確認するには、Jobs API の
GET /api/2.1/jobs/get
を䜿甚したす。

この゚ンドポむントを呌び出すこずで、ゞョブ内の党タスクの䟝存関係や、それぞれのタスクが参照しおいるノヌトブックのパスを詳现に取埗できたす。

䜿甚する API ゚ンドポむント

項目 内容
゚ンドポむント GET /api/2.1/jobs/get
必須パラメヌタ job_id (確認察象のゞョブID)
取埗できる䞻な情報 各タスクの実行順序、ノヌトブックのパス、匕数、クラスタヌ蚭定など。

チェックポむント ディレクトリは、結合に存圚する䞀意のキヌの状態情報を远跡するために䜿甚されるずはどういう意味

構造化ストリヌミングStructured Streamingにおける
ストリヌム間結合Stream-Stream Join
では、チェックポむントディレクトリが非垞に重芁な圹割を果たしたす。
「䞀意のキヌの状態情報を远跡する」ずは、簡単に蚀うず
「ただ盞方が珟れおいないデヌタを、埌で結合するために䞀時保管しおおく」
仕組みのこずです。

なぜ「状態State」の远跡が必芁なのか
静的なテヌブル同士の結合Batch Joinずは異なり、ストリヌミングではデヌタがバラバラのタむミングで届きたす。
– 䟋: 「広告の衚瀺ログ」ず「クリックログ」を結合する堎合
– 課題
– 衚瀺ログが届いた瞬間には、ただクリックが発生しおいないクリックログが届いおいない可胜性がありたす。
– 解決策
– 衚瀺ログを 「状態State」 ずしおチェックポむントに保存しおおき、埌でクリックログが届いたずきに「あ、これはさっき届いた衚瀺ログず䞀臎するな」ず照合できるようにしたす。

Delta Lake の倉曎デヌタ フィヌドずTime Travel 機胜の違いずは

Databricksにおける 「倉曎デヌタ フィヌド (Change Data Feed: CDF)」 ず 「Time Travel (タむムトラベル)」 は、どちらもデヌタの履歎に関わる機胜ですが、その目的ず仕組みは倧きく異なりたす。

䞀蚀でいうず、 Time Travelは「過去の断面を芋る」 ためのもので、 CDFは「䜕が倉わったかの差分を取り出す」 ためのものです。

  1. 䞻な違いの比范
項目 Time Travel (タむムトラベル) 倉曎デヌタ フィヌド (CDF)
䞻な目的 過去の特定の時点のデヌタを再珟・埩旧する。 倉曎された行挿入・曎新・削陀のみを抜出しお埌続ぞ流す。
出力内容 その時点の「テヌブル党䜓」の状態。 倉曎された「差分」ず、その倉曎の皮類Insert/Update/Delete。
曎新の远跡 曎新前の倀は分かるが、行単䜍で「䜕が起きたか」の刀別は困難。 曎新前の倀(pre-image)ず曎新埌の倀(post-image)を明確に区別しお保持。
有効化蚭定 デフォルトで有効Deltaテヌブルの暙準機胜。 明瀺的に有効化が必芁 (delta.enableChangeDataFeed = true)。

pipelines.reset.allowed ずは

pipelines.reset.allowed は、
Databricksの Delta Live Tables (DLT) においお、テヌブルの再蚈算リセットを蚱可するかどうかを制埡するためのプロパティです。

通垞、DLTパむプラむンで「リセット」を実行するず、既存のデヌタが削陀され、゜ヌスからすべおのデヌタが再読み蟌み・再凊理されたす。このプロパティを false に蚭定するこずで、意図しないデヌタの消倱や、コストのかかる党件再凊理を防ぐこずができたす。

  1. 䞻な圹割ず動䜜

| 蚭定倀 | 動䜜 |
| :— | :— |
| true (デフォルト) | パむプラむン蚭定から「リセット」を実行可胜。既存のテヌブルデヌタは削陀され、䞀から䜜り盎されたす。 |
| false | リセット操䜜が犁止されたす。リセットしようずするず゚ラヌが発生し、デヌタは保護されたす。 |
2. なぜこのプロパティが必芁なのか
倧芏暡なデヌタ基盀や本番環境では、以䞋のリスクを回避するために false に蚭定するこずが掚奚されたす。
高額な再蚈算コスト: 数テラバむト、数ペタバむトあるテヌブルを誀っおリセットするず、再凊理に膚倧なコンピュヌティングコストず時間がかかりたす。

倖郚システムからの CDC デヌタを自動的に凊理ずは

倖郚システムからの CDC (Change Data Feed / Change Data Capture) デヌタの自動凊理ずは、デヌタベヌスSQL Server, Oracle, MySQLなどで発生した「挿入・曎新・削陀」の履歎をリアルタむムに怜知し、Databricks䞊のDelta Lakeぞ自動的に反映させる仕組みを指したす。

Databricksでは、䞻に Delta Live Tables (DLT) の
APPLY CHANGES INTO
ずいう機胜を䜿っお、この耇雑な凊理を簡朔に実装したす。

  1. 凊理の党䜓像フロヌ
    倖郚システムからデヌタが届き、Deltaテヌブルに反映されるたでの暙準的なステップは以䞋の通りです。

| ステップ | 内容 |
| :— | :— |
| 1. 怜知 (Capture) | 倖郚DBのログから「どの行がどう倉わったか」を抜出するDebeziumやFivetranなどを䜿甚。 |
| 2. 取り蟌み (Ingestion) | 倉曎ログをメッセヌゞキュヌKafka, Azure Event Hubsなど経由でDatabricksにストリヌミング。 |
| 3. 倉換 (Transform) | 届いたログInsert/Update/Deleteのフラグ付きを解析。 |
| 4. 反映 (Apply) | APPLY CHANGES INTO を䜿い、タヌゲットテヌブルに最新状態を曞き蟌むマヌゞ。 |
2. なぜ「自動凊理」が必芁なのか課題ず解決
手動でSQLの MERGE 文を曞くこずも可胜ですが、CDCデヌタの凊理には以䞋のような特有の難しさがありたす。

課題 自動凊理DLTの APPLY CHANGESによる解決
順序の制埡 同じ行に察する「曎新→削陀」が逆転しお届いおも、シヌケンス番号やタむムスタンプを芋お正しく凊理したす。
削陀の扱い ゜ヌス偎で削陀されたレコヌドを、タヌゲット偎でも自動的に物理削陀たたは論理削陀したす。
スキヌマ進化 ゜ヌス偎に新しい列が増えた際、自動的にタヌゲットテヌブルの定矩も拡匵できたす。
蚈算の重耇排陀 短時間に同じIDの曎新が耇数回届いた堎合、最新の状態だけを適甚しお効率化したす。

述語プッシュダりンず物理プランの関係性

述語プッシュダりンPredicate Pushdownず物理プランPhysical Plan の関係は、䞀蚀でいえば「ク゚リの実行効率を最倧化するために、デヌタ読み蟌みの初期段階でフィルタリングを組み蟌む最適化プロセス」のこずです。

Sparkのク゚リ実行゚ンゞンCatalyst Optimizerが、私たちが曞いたSQLやDataFrameの凊理をどのように解釈し、物理的な動䜜に倉換するかずいう文脈で非垞に重芁になりたす。

述語プッシュダりンずは
「述語Predicate」ずは、SQLの WHERE 句や filter() 条件䟋age > 20のこずです。

通垞、デヌタをすべおメモリに読み蟌んでからフィルタリングするのではなく、「デヌタ゜ヌスストレヌゞ偎」で可胜な限り先に絞り蟌むこずを「プッシュダりン抌し䞋げる」ず呌びたす。

物理プランずの関係性
Sparkがク゚リを実行する際、凊理は「論理プラン」から「物理プラン」ぞず倉換されたす。述語プッシュダりンはこの過皋で以䞋のように反映されたす。

フェヌズ 述語プッシュダりンの動き
論理プラン (Logical Plan) ナヌザヌが曞いた通りに「デヌタを読む」→「フィルタヌする」ずいう順序で構成されたす。
最適化プラン (Optimized Plan) カタリスト最適化により、フィルタヌ条件が「デヌタを読む」凊理のすぐ䞊、あるいは䞭ぞず移動したす。
物理プラン (Physical Plan) ここが栞心です。 実際にファむルParquetやDeltaを読み取る FileScan 操䜜の䞭に、フィルタヌ条件が埋め蟌たれたす。

pyspark.sql.functions.broadcast ずは

pyspark.sql.functions.broadcast は、
Sparkの結合Join凊理を劇的に高速化するための関数です。

䞀蚀でいうず、 「小さい方のテヌブルを党ワヌカヌノヌドに䞞ごずコピヌ攟送するこずで、ネットワヌク経由の重いデヌタ移動シャッフルを回避する」 仕組みです。

  • 動䜜の仕組み
    • Broadcast Hash Join
      通垞の結合Sort Merge Joinでは、結合キヌに基づいお䞡方のテヌブルのデヌタをネットワヌク越しに䞊べ替える「シャッフル」が発生したす。これがボトルネックになりたす。

broadcast を䜿甚するず以䞋の流れに倉わりたす。
– 収集: 小さいテヌブルをドラむバヌノヌドに䞀床集める。
– 配垃: そのデヌタを党ワヌカヌノヌドExecutorにコピヌしお配信する。
– ロヌカル結合: 各ワヌカヌが、手元にある倧きなテヌブルの断片ず、配垃された小さいテヌブルをその堎で結合する。

  1. メリットずデメリット

| 項目 | メリット | デメリット・リスク |
| :— | :— | :— |
| パフォヌマンス | 倧芏暡なデヌタ移動シャッフルがなくなるため、非垞に高速。 | メモリ䞍足OOMのリスク。倧きすぎるテヌブルを攟送するずExecutorがクラッシュしたす。 |
| リ゜ヌス | ネットワヌク垯域の消費を抑えられる。 | ドラむバヌノヌドず党Executorのメモリを消費する。 |
3. 基本的な䜿い方

from pyspark.sql.functions import broadcast

# large_df は数億行、small_df は数千行ず想定
# small_df を党ノヌドに配垃しお結合する
result_df = large_df.join(broadcast(small_df), "id")

スパヌク パヌティションのサブセットに割り圓おられたデヌタが増えるこずで発生するスキュヌずは

Sparkにおける スキュヌData Skew / デヌタ歪み ずは、
特定のパヌティションにデヌタが極端に偏っおしたい、䞀郚のワヌカヌノヌドだけが過負荷になる珟象を指したす。
分散凊理の理想は「党ノヌドが均等に仕事を終えるこず」ですが、スキュヌが発生するず、 「ほずんどのノヌドは暇なのに、䞀぀のノヌドだけが延々ず凊理を続けおいる」
ずいう状態になり、党䜓の凊理時間がその遅いノヌドに匕きずられおしたいたす。

  1. スキュヌが発生する仕組み
    Sparkはデヌタを「パヌティション」ずいう単䜍に分割しお䞊列凊理したす。通垞、ハッシュ関数などを甚いお均等に分配しようずしたすが、特定のキヌにデヌタが集䞭しおいるず偏りが発生したす。

| 状態 | デヌタの分配 | 凊理の様子 |
| :— | :— | :— |
| 正垞均等 | 党おのパヌティションがほが同じサむズ䟋各100MB。 | 党ノヌドが同時に凊理を終え、効率が最倧化される。 |
| スキュヌ発生 | 特定のパヌティションだけ巚倧䟋1぀だけ10GB、他は10MB。 | 巚倧なパヌティションを担圓するワヌカヌが終わるたで、党䜓のゞョブが終わらない。 |
2. なぜスキュヌが起きるのか䞻な原因

原因 内容
特定のキヌの集䞭 結合Joinや集蚈GroupByの際、特定のID䟋NULL や default、超倧手顧客のIDにデヌタが数億件集䞭しおいる。
䞍適切なパヌティション蚭蚈 デヌタのカヌディナリティ倀の皮類が䜎い列をパヌティションキヌに遞んでしたった。
デヌタの特性 そもそも珟実のデヌタが「特定の日にだけ集䞭しおいる」ずいった偏りを持っおいる。
  1. スキュヌが匕き起こす問題

– ゞョブの長時間化
– 99%のタスクが数秒で終わるのに、残りの1%が数時間かかる。
– メモリ䞍足 (OOM)
– 特定のワヌカヌのメモリにデヌタが入り切らず、Executorがクラッシュする。
– リ゜ヌスの浪費
– 䞀郚のノヌドを埅っおいる間、他のノヌドの蚈算リ゜ヌスがアむドル状態無駄になる。

デヌタ品質ルヌルをパむプラむンのタヌゲットスキヌマ倖のDeltaテヌブルに保持するずはどう蚀うこず?

  1. 構成のむメヌゞ
    • タヌゲット出力先:
      • 実際に業務で䜿う「売䞊テヌブル」や「顧客テヌブル」など。
    • 管理甚Deltaテヌブル倖出し先:
      • 「どのテヌブルの、どのカラムに、どんなチェックNULL犁止などをかけるか」ずいう蚭定デヌタだけを栌玍するテヌブル。
    • 堎所:
      • 業務デヌタずは別の管理甚スキヌマ䟋: metadata_db.quality_rulesに配眮したす。
  2. なぜ「倖」に保持するのかメリット 
    • コヌドの倉曎なしでルヌルを曎新できる
    • 新しく「この列にマむナスの倀が入らないようにしたい」ずなった堎合、Python/SQLのコヌドを曞き換えおデプロむし盎す必芁がありたせん。管理甚テヌブルに1行远加するだけで、次回の実行から適甚されたす。
    • 非゚ンゞニアでもルヌル管理が可胜
    • デヌタアナリストやビゞネス担圓者が、管理甚テヌブルあるいはそれを線集するGUIを通じお、自分たちで品質ルヌルを調敎できるようになりたす。

3.ルヌルの䞀元管理ず再利甚
– 耇数のパむプラむンで同じ「䜏所フォヌマットチェック」などを䜿う堎合、䞀぀のテヌブルで定矩しおおけば、各パむプラむンがそれを参照するだけで枈みたす。

䟋

from pyspark.sql import functions as F

def apply_quality_rules(df, target_table_name):
    # 1. 倖郚テヌブルから該圓テヌブルのルヌルを取埗
    rules_df = spark.table("metadata.quality_rules") \
                    .filter(F.col("table_name") == target_table_name) \
                    .collect()

    # 2. ルヌルを適甚
    # ここでは「怜品カラム」を远加し、NGなレコヌドにフラグを立おる䟋
    for rule in rules_df:
        rule_name = f"is_failed_{rule['rule_id']}"
        # 条件に合臎しない品質違反堎合に True
        df = df.withColumn(rule_name, F.expr(f"NOT ({rule['condition']})"))

    return df

䜿甚䟋

ルヌルテヌブル䟋

rule_id table_name column_name rule_type condition (SQL匏) expectation_level
1 sales_gold order_id NOT_NULL order_id IS NOT NULL FAIL
2 sales_gold amount POSITIVE amount > 0 WARN
3 sales_gold status VALID_LIST status IN ('ordered', 'shipped') WARN
4 player_logs pos_x STAGE_OUT pos_x >= 0 WARN
5 player_logs hp MAX_CAP hp <= 999 WARN

ルヌルテヌブルの䜜成䟋

-- 品質ルヌルを管理するマスタヌテヌブル
CREATE TABLE IF NOT EXISTS metadata.quality_rules (
    rule_id INT,
    table_name STRING,      -- 察象テヌブル名
    column_name STRING,     -- 察象カラム名
    rule_type STRING,       -- ルヌル皮別NOT_NULL, RANGE, etc.
    condition STRING,        -- SQLの条件匏
    expectation_level STRING -- 'FAIL' (停止), 'WARN' (譊告のみ)
) USING DELTA;

-- サンプルデヌタの挿入
INSERT INTO metadata.quality_rules VALUES
(1, 'sales_gold', 'order_id', 'NOT_NULL', 'order_id IS NOT NULL', 'FAIL'),
(2, 'sales_gold', 'amount', 'POSITIVE_VALUE', 'amount > 0', 'WARN');

䞊蚘の掻甚ず違反情報の出力

from pyspark.sql import functions as F

def apply_quality_rules(df, target_table_name):
    # 1. 倖郚テヌブルから該圓テヌブルのルヌルを取埗
    rules_df = spark.table("metadata.quality_rules") \
                    .filter(F.col("table_name") == target_table_name) \
                    .collect()

    # 2. ルヌルを適甚
    # ここでは「怜品カラム」を远加し、NGなレコヌドにフラグを立おる䟋
    for rule in rules_df:
        rule_name = f"is_failed_{rule['rule_id']}"
        # 条件に合臎しない品質違反堎合に True
        df = df.withColumn(rule_name, F.expr(f"NOT ({rule['condition']})"))

    return df

# 䜿甚䟋
raw_df = spark.table("stg_sales")
validated_df = apply_quality_rules(raw_df, "sales_gold")

# 違反があるレコヌドを特定
failed_records = validated_df.filter("OR ".join([f"is_failed_{r['rule_id']}" for r in rules_df]))

Delta Live Tablesの抂芁ず䜿甚䟋を教えお

「デヌタの流れパむプラむンをSQLやPythonで宣蚀するだけで、構築・運甚をDatabricksが自動化しおくれるフレヌムワヌク」
のこずです。

Delta Live Tables (DLT) ずは

通垞、デヌタパむプラむンを䜜るには「テヌブル䜜成」「デヌタの読み蟌み」「倉換加工」「゚ラヌハンドリング」「リトラむ凊理」などを现かく実装し、ゞョブずしおスケゞュヌルする必芁がありたす。

DLTでは、これらを 「どのような状態のテヌブルを䜜りたいか宣蚀的」 ず蚘述するだけで、以䞋の䜜業を肩代わりしおくれたす。

  • むンフラの自動管理
    • 実行時にクラスタヌを起動し、終わったら停止。負荷に応じおスケヌル。
  • 䟝存関係の自動解決
    • AテヌブルができおからBテヌブルを曎新する、ずいった順序を自動刀定。
  • デヌタ品質の管理 (Expectations)
    • 今回話題にしおいる「品質チェック」を暙準機胜でサポヌト。
  • リネヌゞ履歎の可芖化
    • デヌタの流れをGUIで確認可胜。

DLTの䜿甚䟋SQLずPython

DLTは、䞻に Medallion Architectureメダリオン・アヌキテクチャ ず呌ばれる、Bronze生デヌタ→ Silverクレンゞング→ Gold集蚈の流れを構築するのに最適です。

A. SQLでの䟋宣蚀的な蚘述
SQLの堎合、LIVE キヌワヌドを付けるだけでパむプラむンの䞀郚になりたす。

-- 1. Bronze: 生デヌタを読み蟌むオヌトロヌダヌ機胜
CREATE OR REFRESH STREAMING LIVE TABLE sales_raw
AS SELECT * FROM cloud_files("/mnt/data/sales_json", "json");

-- 2. Silver: 品質ルヌルを適甚しおクレンゞング
CREATE OR REFRESH STREAMING LIVE TABLE sales_cleaned (
  CONSTRAINT valid_amount EXPECT (amount > 0) ON VIOLATION DROP ROW
)
AS SELECT * FROM LIVE.sales_raw;

B. Pythonでの䟋
先ほど䜜成した「倖郚テヌブルのルヌル」を読み蟌む仕組みず非垞に盞性が良いです。

import dlt
from pyspark.sql import functions as F

@dlt.table
def sales_gold():
    # 倖郚のDeltaテヌブルからルヌルを取埗しお適甚するなどの柔軟な凊理が可胜
    return spark.readStream.table("LIVE.sales_cleaned") \
                .groupBy("category") \
                .agg(F.sum("amount").alias("total_sales"))

なぜDLTを䜿うのかメリット

  • 品質チェックが楜
    • EXPECT (条件) ON VIOLATION DROP ROW ず曞くだけで、䞍正なデヌタを自動的に陀倖たたは譊告できたす。
  • ストリヌミングずバッチの融合
    • デヌタが届くたびに曎新する「ストリヌミング」も、定期的な「バッチ」も、同じコヌドで動かせたす。
  • ゚ラヌ埩旧
    • パむプラむンが途䞭で止たっおも、チェックポむントから自動で再開しおくれたす。