はじめに
DatabricksのCertified Data Engineer Associate試験でよく問われる項目、
理解が甘かったので学習しなおした項目をまとめました。
項目ごとに簡潔に分けているので、ざっと振り返りたい場合などにぜひご活用ください
単純にDatabricksの便利機能を知りたい場合も活用できると思います
.groupBy(“id”).agg(F.sum(“amount”).alias(“total”))
で、Fがないパターンもできる?
aliasは必ず必要?
以下の感じでも書ける
from pyspark.sql.functions import sum
# F. なしで実行可能
df.groupBy("id").agg(sum("amount").alias("total"))
以下でやればカラム名は sum(amount) となる
# alias なし
df.groupBy("id").agg(F.sum("amount"))
OPTIMIZEを2回連続で行う意味はある?
同じテーブルに対してデータ更新がない状態で OPTIMIZE を2回連続で行う意味は、基本的にはありません。
Lakehouse Federationとは
Unity Catalogが司令塔となり、MySQL、PostgreSQL、Snowflake、BigQueryといった外部ソースに対して「コネクション」を張り、それらをDatabricks上の「外部カタログ(Foreign Catalog)」としてマウントします。
| 比較項目 | 外部テーブル (External Table) | Lakehouse Federation |
|---|---|---|
| 接続先 | クラウドストレージ (S3 / ADLS / GCS) | 外部DB (MySQL, Snowflake, BigQuery等) |
| データ形式 | ファイル (Delta, Parquet, CSV, JSON) | DB内のテーブル (各DBの独自形式) |
| 計算エンジン | Databricks がファイルを読み取って処理 | 接続先DB が処理して結果だけを返す |
| パフォーマンス | 高速 (特にDeltaなら最適化が効く) | 接続先DBの性能に依存 (低速になりがち) |
| 運用負荷 | スキーマ定義やパス指定の管理が必要 | 接続設定(Connection)を作るだけでOK |
外部テーブルを作る場合はManaged が必要? Create Tableだけ?
外部テーブルを作る際にManagedは必要ありません。
むしろ、LOCATION 句を書くか書かないかで、どちらになるかが決まります。
| テーブルの種類 | SQLの書き方 | データの保存場所 | テーブルを DROP すると? |
|---|---|---|---|
| Managed Table | CREATE TABLE ... |
Databricksが管理する専用ストレージ | メタデータもデータファイルも消える |
| External Table | CREATE TABLE ... **LOCATION 's3://...'** |
自分で指定したパス(S3/ADLSなど) | メタデータだけ消え、データファイルは残る |
ヒストリカルデータはBronzeに入れるべき?
結論から言うと、「はい、ヒストリーデータ(履歴データ)は必ずBronzeレイヤーに保持しておくべき」です。
Photonとは、どういう問題が問われがち?
- 問われる点: 従来のSparkエンジン(JVM/Java)との違い。
- 正解: Photonは C++ で実装されており、ベクトル化実行(Vectorized Execution) という技術を使っています。
- 理由: Javaのメモリ管理(ガベージコレクション)のオーバーヘッドを避け、CPUの性能(SIMD命令など)を限界まで引き出すためです。
- 問われる点: どのコンピューティングタイプでPhotonが使えるか?
- 正解: SQL Warehouse: ProとServerlessではデフォルトで有効(オフにできない)。
- All-Purpose / Job Cluster: クラスター作成時にチェックボックスで有効化できる。
Databricks jobの中で、コンピュートリソースは自動でスケールインアウトされる?
はい、自動でスケールイン・アウト(ノードの増減)が可能です。
spark.read.json(“パス”)またはspark.read.format(“json”).load(“パス”)でjson読める?
どちらの方法でも正しくJSONファイルを読み込むことができます。
コンテナからjsonデータを読む場合はどんな感じのコマンドになる?.format(“cloud”)とかは使う?
format("cloud") という指定は存在しません
スキーマが変更されてたら落としたい場合は、どんなオプションが使える?faillon hogehogeなどがある?
「スキーマが変わったら勝手に進めずに、エラーを出して止まってほしい」という場合に使うオプションは cloudFiles.schemaEvolutionMode で、値に failOnNewColumns を指定します。
REFRESH TABLEの意味
Databricks(Spark SQL)における REFRESH TABLE とは、一言で言うと「テーブルのメタデータ(管理情報)を最新状態に更新して、キャッシュをクリアするコマンド」のことです。
「データがそこにあるはずなのに、クエリを投げても反映されない!」という時に使う、いわば「情報の再読み込みボタン」のような役割を果たします。
Delta Sharingを行う場合、UnityのメタストアIDの共有が必要?
「Databricks間(UC-to-UC)で共有を行う場合」は、受信側のメタストアID(正確には「共有識別子」)の共有が必須
Unity CatalogのLineage(データリネージ)機能?
一言でいうと「データの家系図」を自動で記録・可視化してくれる機能のことです。
「このテーブルのデータはどこから来たのか?」「このカラムを削除したら、どのレポートが壊れるか?」といった疑問に、コードを読み解くことなく一瞬で答えてくれます。
1. Lineageでわかること
Unity Catalogのリネージは、単なる「テーブル間のつながり」以上の情報を見せてくれます。
- テーブルレベルのリネージ: どのソーステーブルから、どのターゲットテーブルが作られたか。
- カラムレベルのリネージ: (これが非常に強力!)特定のカラムが、元データのどのカラムを組み合わせて計算されたものか。
- エンティティ間のつながり: テーブルだけでなく、それを処理したノートブック、実行されたジョブ、最終的に表示しているダッシュボード(AI/BI)まで繋がって見えます。
- AIモデルのリネージ: そのモデルがどのデータセットで学習されたか(Feature Storeとの連携)。
All-Purpose Clusterとは
データサイエンティストやエンジニアが「開発や実験、分析のために自由に、かつインタラクティブに使うための計算リソース」のことです。
Databricksのノートブックを開いて、コードを1行ずつ実行しながら結果を確認する……そんな時に背後で動いているのがこのクラスターです。
| 項目 | All-Purpose Cluster | Job Cluster |
|---|---|---|
| 主な用途 | 開発・分析・デバッグ | 本番運用・定期実行(ETL) |
| 起動タイミング | ユーザーが手動(またはノートブック接続時) | ジョブの開始時に自動生成 |
| 終了タイミング | ユーザーが手動(または自動停止設定) | ジョブ完了時に自動で破棄 |
| コスト(DBU) | 高い | 安い(約半分程度) |
| 永続性 | 停止しても設定は残る | ジョブごとに使い捨て |
dlt.viewとdlt.tableの違い
| 特徴 | dlt.table (テーブル) | dlt.view (ビュー) |
|---|---|---|
| データの永続性 | ストレージに Delta形式で保存される | ストレージに 保存されない |
| 外部からの参照 | 他のノートブックやBIツールから参照可能 | DLTパイプライン内でのみ使用可能 |
| 計算のタイミング | パイプライン実行時に一度だけ計算 | 後続の処理が動くたびに再計算 |
| 主な用途 | 最終結果、中間バックアップ、外部公開用 | 中間処理(クリーニング、機密情報の削除) |
| ストレージコスト | 発生する | 発生しない |
dlt.view を使うべき時
「この処理の結果を、パイプラインの外で見せる必要はない」という場合に最適です。
- 中間加工: カラム名の変更や型変換など、次のステップのための「下準備」。
- フィルタリング: 無効なデータを取り除く処理。
- コスト削減: ストレージへの書き込み(I/O)が発生しないため、不要なデータ保存を避けてパイプラインを高速化できます。
dlt.table を使うべき時
「データとして残しておきたい」または「外部から見たい」場合に必須です。
- メダリオンの各層: Bronze, Silver, Gold の各段階。
- BIツールでの利用: ダッシュボードなどで可視化したい最終データ。
- 複雑な再計算の防止: 非常に重い計算(大きなテーブル同士のJOINなど)の結果を保存しておき、後続の処理で何度も使い回したい場合。
Databricksの開発を好きなIDEで行うことは可能?
1. 「コードだけ送る」スタイル(VS Code拡張機能 / DABs)
これは「環境の再現」ではなく、「編集は手元、実行はあっち」という分業制です。
- 仕組み: ローカルのIDEで書いたファイルを、保存した瞬間にDatabricksのワークスペース(リモート)へ同期します。
- 実行: 「実行」ボタンを押すと、Databricks上のクラスターに対して「今送ったこのファイルを実行せよ」という命令が飛びます。
- メリット: ローカルPCの性能が低くても、クラウドの強力なパワー(GPUや大容量メモリ)をそのまま使えます。
2. 「脳だけローカル」スタイル(Databricks Connect)
これが「環境の再現」に一番近い感覚かもしれません。
- 仕組み: ローカル環境に
databricks-connectというライブラリを入れます。これはSparkの「司令塔(ドライバー)」の役割をローカルで代行するものです。 - 再現性: ローカルのPython環境(ライブラリのバージョンなど)を、Databricksクラスターのランタイム(DBR)と厳密に一致させることで、手元で動かしている感覚でリモートのワーカーを操れます。
- メリット: IDEのデバッガー(ステップ実行)が完璧に機能します。
Databricksデバッガーって変数見える?それはSpark CLIの機能でも提供されてる?
変数の値をリアルタイムで「見る」ことができます。
Databricksのデバッガーは、単にコードを止めるだけでなく、その時点でのメモリ上の状態を可視化することに優れています。ただし、Spark CLI(コマンドライン)とは役割も機能も全く異なります。
| 機能 | Databricks デバッガー | Spark CLI (pyspark / spark-shell) |
|---|---|---|
| インターフェース | GUI(ブラウザ / VS Code) | テキスト(ターミナル) |
| 変数の確認方法 | 専用パネルで一覧表示 | print() や dir() を打って手動確認 |
| ブレークポイント | クリック一つで設定可能 | pdb.set_trace() などのコード埋め込みが必要 |
| 主な目的 | 複雑なロジックの修正・デバッグ | 簡単な動作確認・小規模な命令実行 |
CPU時間ってなに、これがジョブ時間より長いと何が問題なことが多い?
結論から言うと、並列処理を行うDatabricks(Spark)では「CPU時間 > ジョブ時間」になるのは正常な状態ですが、その「倍率」や「中身」がおかしい場合に問題が発生しています。
1. CPU時間とジョブ時間の違い
- ジョブ時間(Wall-clock Time / Elapsed Time):
あなたがストップウォッチで測った「実際の待ち時間」です。ジョブが始まってから終わるまでのカレンダー上の時間です。 - CPU時間(CPU Time):
クラスター内の全CPUコアが働いた時間の合計です。
[!TIP]
「人月(マンアワー)」の概念と同じです。
- 10人がかりで1時間かかる作業をした場合:
- ジョブ時間 = 1時間
- CPU時間 = 10時間(10人 × 1時間)
2. 「CPU時間 > ジョブ時間」は普通?
はい、並列処理をしているなら絶対にCPU時間の方が長くなります。
もし8コアのクラスターを使っていて、CPU時間がジョブ時間の約8倍であれば、全コアを効率よく使い切っている「健康な状態」と言えます。
Delta sharingの場合、リージョンが異なると追加料金かかる?
はい、追加料金(主にクラウドストレージのデータ転送料/エグレス料金)が発生します。
これはDatabricksの利用料(DBU)というよりも、データを保持しているクラウドプロバイダー(AWS, Azure, GCP)側から請求される費用が主な要因です。
有効なジョブ定義をする場合、resource配下はyaml?json?
「YAML(ヤムル)」 です。
rescueとは
cloudFiles.schemaEvolutionMode の一つ、「rescue(レスキュー)」モード、およびそこで作成される 「レスキューデータカラム(_rescued_data)」 のこと
「スキーマに合わなかったり、予定になかったデータ」を捨てずに、専用の避難シェルター(カラム)に放り込んで守る機能
Auto Loader で rescue モードを有効にすると、テーブルに _rescued_data という名前の隠しカラム(JSON形式)が自動で作られます。
Kafkaを読み込む場合はreadfrom(Kafka)?
readfrom ではなく、.format(“kafka”) を使います
df = (spark.readStream
.format("kafka") # 1. 形式をkafkaに指定
.option("kafka.bootstrap.servers", "host:port") # 2. 接続先サーバー
.option("subscribe", "topic_name") # 3. 購読するトピック名
.option("startingOffsets", "earliest") # 4. どこから読み始めるか
.load())
監査ログってcsv?json?xml?
Databricksの監査ログ(Audit Logs)の標準的なフォーマットは JSON です
DLTのメリットの一つの宣言的パイプラインとは、そうでないものとの違いも含めて解説して
Delta Live Tables(DLT)の最大の特徴である「宣言的(Declarative)パイプライン」。これは、従来のデータエンジニアリングの手法を劇的に変えた考え方です。
一言でいうと、「『どうやって(How)』動かすかではなく、『何(What)』を作りたいかを記述する」
スタイルを指します。
対照的な概念である「命令的(Imperative)パイプライン」と比較して解説します。
1. 「命令的」 vs 「宣言的」の違い
- 命令的: 「読み込んで、加工して、保存せよ」という手順書。
- 宣言的: 「このデータからこのテーブルを作れ」という定義書。
命令的パイプライン(従来のSparkノートブックなど)
「手順」を一つずつ指示するスタイルです。
- 特徴: 料理のレシピでいうと「まずお湯を沸かして、次に麺を3分茹でて、その間にスープを作って……」と手順の順番と方法をすべて書きます。
- コードの例:
- クラスターを起動する。
- S3からファイルを読み込む。
- データを加工する。
- Deltaテーブルに保存する。
- 失敗したらリトライするコードを書く。
- 課題: 依存関係(Aが終わったらBをやる)やエラー処理、リソース管理をすべて自分でプログラミングしなければなりません。
宣言的パイプライン(DLT)
「完成図」を定義するスタイルです。
- 特徴: レシピではなく「美味しいラーメンが食べたい(完成状態)」と注文するだけです。お湯をいつ沸かすか、火加減をどうするかはシステム(DLT)が判断します。
- コードの例:
- 「ソースはこのフォルダのJSON」
- 「テーブルAは、ソースをフィルタリングしたもの」
- 「テーブルBは、テーブルAをJOINしたもの」
- メリット: 実行順序の制御やインフラの管理をDLTに丸投げできます。
| 項目 | 命令的(Standard Spark) | 宣言的(DLT) |
|---|---|---|
| 読み込み | spark.read.format("json").load(...) |
dlt.read_stream("source_name") |
| 書き込み | df.write.format("delta").saveAsTable(...) |
@dlt.table(デコレータで定義) |
| 依存関係 | ノートブックのセル順や外部Jobツールで制御 | システムがコードを解析して自動判別 |
今回使うサンプルデータ
以下のような、ECサイトの生データ(JSON)を想定します。
{"order_id": 101, "amount": 2500, "status": "shipped"}
{"order_id": 102, "amount": -500, "status": "error"} // ← 不正データ(金額がマイナス)
{"order_id": 103, "amount": 1200, "status": "shipped"}
やりたいこと:
- Bronze: 生データをそのまま取り込む
- Silver: 金額が 0 以上の正常なデータだけを抽出する
1. 命令的パイプライン(従来の手法)
「どうやって処理するか」をステップごとに記述します。
# 手順1: データを読み込む
df_raw = spark.read.json("/path/to/raw_sales")
# 手順2: フィルタリング処理(ロジック)
df_cleaned = df_raw.filter("amount >= 0")
# 手順3: 保存(パスやフォーマット、チェックポイントを個別に指定)
df_cleaned.write.format("delta").mode("append").save("/path/to/silver_sales")
# 手順4: (運用者がやること)
# このノートブックを毎日10時に実行するように「ジョブ」を設定し、
# 失敗した時のリトライ設定や、クラスターのサイズ管理も自分で行う。
[!CAUTION]
問題点: もし「Silverの後にGoldテーブルを追加したい」と思ったら、コードを書き換えるだけでなく、外部のオーケストレーター(Jobツールなど)で実行順序を再構成しなければなりません。
2. 宣言的パイプライン(DLTの手法)
「何を作りたいか(定義)」だけを記述します。
import dlt
# 1. Bronzeテーブルを定義(Auto Loaderを使用)
@dlt.table
def sales_bronze():
return spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/path/to/raw_sales")
# 2. Silverテーブルを定義(品質チェックも宣言するだけ!)
@dlt.table
@dlt.expect_or_drop("valid_amount", "amount >= 0") # 不正データは落とせ、と宣言
def sales_silver():
return dlt.read("sales_bronze") # 前のテーブルを「読む」と書くだけで依存関係が成立
宣言的であることの「実利」はここ!
メリットA:依存関係の自動解決(「次は何?」を考えなくていい)
- 命令的: 「Aの保存が終わったらBを動かせ」という順序を人間が管理します。
- 宣言的: DLTのコード内で
dlt.read("sales_bronze")と書いた瞬間に、システムが「あ、先にBronzeを更新してからSilverを動かさなきゃな」と自動で判断し、最短ルートで実行します。
メリットB:データ品質の「見える化」
- 命令的: フィルタリングで何件落ちたかを知るには、自分でログを出力するコードを書く必要があります。
- 宣言的:
@dlt.expect...と宣言するだけで、Databricksの管理画面に「何件のデータがルール違反でドロップされたか」というダッシュボードが自動生成されます。
メリットC:スキーマ変更やリプロセスの容易さ
- 命令的: 過去1年分のデータをやり直したい(リプロセス)場合、手動でフォルダを空にして、チェックポイントをリセットして……という作業が必要です。
- 宣言的: UIの 「Full Refresh」 ボタンをポチッと押すだけ。システムが「現在の定義」に合わせて、全データを安全に作り直してくれます。
結論
宣言的なパイプラインを使うと、エンジニアは「データの配管工事(インフラ管理や順序制御)」から解放され、「データにどんなビジネスルールを適用するか」という価値創出に時間を割けるようになります。