ã¯ããã«
Databricksã®Certified Data Engineer Professional詊éšã§ããåãããé
ç®ã
çè§£ãçãã£ãã®ã§åŠç¿ããªãããé
ç®ããŸãšããŸããã
é
ç®ããšã«ç°¡æœã«åããŠããã®ã§ããã£ãšæ¯ãè¿ãããå Žåãªã©ã«ãã²ã掻çšãã ãã
åçŽã«Databricksã®äŸ¿å©æ©èœãç¥ãããå ŽåãæŽ»çšã§ãããšæããŸã
Liquid ClusteringãšZ-Orderã®éã
| æ¯èŒé ç® | Z-Order (åŸæ¥ã®æ¹åŒ) | Liquid Clustering (ææ°) |
|---|---|---|
| å°å ¥ããŒãžã§ã³ | å€ãããå©çšå¯èœ | Databricks Runtime 13.3 以é |
| äž»ãªç®ç | 倿¬¡å ããŒã¿ã®ãã£ã«ã¿ãªã³ã°é«éå | æžã蟌ã¿/èªã¿åãã®äž¡ç«ãšéçšã®èªåå |
| ã¡ã³ããã³ã¹ | æåã§ OPTIMIZE ãé »ç¹ã«å®è¡ |
å¢åæé©åã«ãã管çã³ã¹ããå€§å¹ ã«äœäž |
| æžãèŸŒã¿æ§èœ | ããŒã¿ã®äžŠã¹æ¿ãã«ããè² è·ãé«ã | å¹ççãªã¯ã©ã¹ã¿ãªã³ã°ã§æžã蟌ã¿ãéã |
| ããŒãã£ã·ã§ãã³ã° | éåžžãç©çããŒãã£ã·ã§ã³ãšäœµçš | ç©çããŒãã£ã·ã§ã³ãäžèŠã«ããïŒæšå¥šïŒ |
| æè»æ§ | ã«ã©ã æ§æã®å€æŽã«ã¯å šããŒã¿ã®åæ§ç¯ãå¿ èŠ | ã¯ã©ã¹ã¿ãªã³ã°ããŒãåçã«å€æŽå¯èœ |
| é©çšæšå¥š | éçãªå°èŠæš¡ããŒã¿ãå€ãç°å¢ | å€§èŠæš¡ããŒã¿ãé »ç¹ãªæŽæ°ãé«å€æ¬¡å ãã£ã«ã¿ |
- ããã¡ã€ã«ã®æçåãããã¹ãã¥ãŒïŒåãïŒã ãšããåèªãåºãŠããããLiquid Clustering ãæ£è§£ã®å¯èœæ§ãé«ãã§ãã
- ã1ã€ã®ã«ã©ã ã ãã§ãªããè€æ°ã®ã«ã©ã ã§å¹ççã«çµã蟌ã¿ããã ãšããæèã§ããã°ãäž¡æ¹ã®ææ³ã該åœããŸãããææ°ã®ãã¹ããã©ã¯ãã£ã¹ãåãããŠãããªã Liquid Clustering ã§ãã
mergeSchema=trueã¯ã©ãããæäœ¿ãã¹ãïŒ
| é ç® | å 容 |
|---|---|
| äž»ãªçšé | æ¢åã®ããŒãã«ã«ãæ°ããã«ã©ã ïŒåïŒãå«ãããŒã¿ã远å ã»æŽæ°ããå Žå |
| ããã©ã«ãæå | Spark/Delta Lakeã¯ããã©ã«ãã§ã¹ããŒãäžäžèŽããšã©ãŒã«ããïŒä¿è·æ©èœïŒ |
| å®è¡ã³ãã³ãäŸ | df.write.format("delta").mode("append").option("mergeSchema", "true").save(path) |
| çºçããåŠç | æžã蟌ã¿ãšåæã«ãã¿ãŒã²ããããŒãã«ã®ã¡ã¿ããŒã¿ã«æ°ã«ã©ã ãèªå远å ãã |
| å¶çŽã»æ³šæç¹ | æ¢åã®ã«ã©ã ã®ããŒã¿å倿ŽïŒäŸ: String â IntïŒã«ã¯äœ¿çšã§ããªãïŒäžæžããå¿ èŠïŒ |
| æšå¥šã·ãŒã³ | Bronzeå±€ãªã©ããœãŒã¹ããŒã¿ã®é ç®ãé »ç¹ã«å¢ããç°å¢ã§ã®ããŒã¿åã蟌㿠|
- æ°ããã«ã©ã ã®è¿œå (Append)
ãœãŒã¹ããŒã¿ã«æ°ããåïŒäŸïŒnew_featureïŒã远å ãããéããã®ãªãã·ã§ã³ããªããšãSchema mismatchããšã©ãŒã§åæ¢ããŸããtrueã«ããããšã§ãæ¢åã®ããŒãã«æ§é ãåçã«æ¡åŒµããŸãã - Mergeæã§ã®å©çš
SQLã®MERGE INTOæäœãè¡ãéããœãŒã¹åŽã«ãããªãã«ã©ã ãã¿ãŒã²ããã«åæ ããããå Žåã«SET T.new_col = S.new_colãšèšè¿°ãã代ããã«ãã¹ããŒãé²åãèš±å¯ããŠèªåã§ã«ã©ã ãäœæãããŸãã - æ§é åã¹ããªãŒãã³ã° (Structured Streaming)
ã¹ããªãŒã åŠçäžã«ãœãŒã¹ã®ã¹ããŒããå€ããå¯èœæ§ãããå Žåããã§ãã¯ãã€ã³ããšçµã¿åãããŠã¹ããŒãã®å€æŽã蚱容ããããã«äœ¿çšããŸãã
expectãšexpect_or_dropã®éã
| æ§æ (Expectation) | ããŒã¿ãå¶çŽã«éåããå Žåã®æå | ã¬ã³ãŒãã®æ±ã | ãã€ãã©ã€ã³å šäœ |
|---|---|---|---|
expect |
ã¡ããªã¯ã¹ã«ã«ãŠã³ããããããåŠçã¯ç¶è¡ããã | ãã®ãŸãŸã¿ãŒã²ããã«æžã蟌ãŸãã | æ£åžžçµäºïŒèŠåã®ã¿ïŒ |
expect_or_drop |
éåããã¬ã³ãŒãã¯ç Žæ£ããã | ã¿ãŒã²ããã«ã¯æžã蟌ãŸããªã | æ£åžžçµäºïŒããããã®ã¿ïŒ |
expect_or_fail |
ãã€ãã©ã€ã³ãå³åº§ã«ãšã©ãŒã§åæ¢ãã | æžã蟌ã¿èªäœã倱æãã | ç°åžžçµäº |
詊éšå¯Ÿçã®ãã€ã³ã
詊éšã§ã¯ããããŒã¿å質ã確ä¿ãã€ã€ããã€ãã©ã€ã³ã®åæ¢ã¯é¿ãããããšããã·ããªãªãåºãããšããããŸãããã®å Žåã®æ£è§£ã¯ expect_or_drop ã§ãã
äžæ¹ããäžæ£ãªããŒã¿ã1ä»¶ã§ãæ··å
¥ããããšã絶察ã«èš±ããªãããšããèŠä»¶ã§ããã° expect_or_fail ãéžã³ãŸãã
expectãšexpect_or_dropã®æŽ»çšäŸ
å ·äœçãªãã€ãã©ã€ã³èšèšã®äŸ
1. expect ã®æŽ»çšïŒããŒã¿å質ã®ã¢ãã¿ãªã³ã°
Bronzeå±€ããããŒã¿ãèªã¿èŸŒãéããœãŒã¹ã·ã¹ãã åŽã®äžå ·åãæ©æçºèŠããããã«äœ¿çšããŸãã
- ã·ããªãª: ãŠãŒã¶ãŒIDãããŸã«æ¬ æããããã·ã¹ãã ãæ¢ããããªãã
- ã³ãŒãã€ã¡ãŒãž:SQL
CONSTRAINT valid_user_id EXPECT (user_id IS NOT NULL) -
çµæ: å šã¬ã³ãŒããä¿åãããDatabricksã®ãPipeline Detailsãç»é¢ã§ãäœïŒ ã®ããŒã¿ãNULLã ã£ããããèŠèŠçã«ææ¡ã§ããŸãã
2. expect_or_drop ã®æŽ»çšïŒããŒã¿ã¯ã¬ã³ãžã³ã°
åæçšã®Silverå±€ããŒãã«ãäœæããéã«ãåŸç¶ã®MLã¢ãã«ãéèšãå£ããªãããã«äœ¿çšããŸãã
- ã·ããªãª: 幎霢ïŒageïŒããã€ãã¹ã®å€ã200æ³ãè¶ ãããããªãæããã«èª€ã£ãããŒã¿ãé€å€ãããã
- ã³ãŒãã€ã¡ãŒãž
CONSTRAINT realistic_age EXPECT (age > 0 AND age < 150) ON VIOLATION DROP ROW
- çµæ: æ£åžžãªç¯å²ã®ããŒã¿ã ããããŒãã«ã«æ®ããç°åžžãªããŒã¿ã¯èªåçã«ãã£ã«ã¿ãªã³ã°ïŒé€å€ïŒãããŸãã
詊éšã§çããããèœãšã穎ã
詊éšåé¡ã§ã¯ã以äžã®ãããªå€æãæ±ããããããšããããŸãã
- ããããããããã¬ã³ãŒãã¯ã©ããžè¡ãã®ãïŒã
- ããã©ã«ãã§ã¯ããããããããã¬ã³ãŒãã®å
容èªäœã¯ã¿ãŒã²ããããŒãã«ã«ã¯æ®ããŸããããããããããããäžèº«ã調æ»ãããå Žåã¯ã
expectã䜿ã£ãŠãæå¹ãã©ã°åããèªäœããããQuarantineïŒéé¢ïŒçšã®å¥ããŒãã«ãžæµãããžãã¯ãçµãå¿ èŠããããŸãã
- ããã©ã«ãã§ã¯ããããããããã¬ã³ãŒãã®å
容èªäœã¯ã¿ãŒã²ããããŒãã«ã«ã¯æ®ããŸããããããããããããäžèº«ã調æ»ãããå Žåã¯ã
- ãè€æ°ã®å¶çŽãããå Žåã
- 1ã€ã®ããŒãã«ã«
expectãšexpect_or_dropãæ··ããŠå®çŸ©ããããšãå¯èœã§ãããã®å Žåã1ã€ã§ãdropæ¡ä»¶ã«è§Šããã°ããã®ã¬ã³ãŒãã¯ç Žæ£ãããŸãã
- 1ã€ã®ããŒãã«ã«
expectãããšãå¶çŽéåã®ããŒã¿ã¯ã©ãã«è¡ãïŒ
| æ§æ | ããŒã¿ã®è¡æ¹ | ã¡ããªã¯ã¹ïŒãã°ïŒã®èšé² |
|---|---|---|
expect |
ã¿ãŒã²ããããŒãã«ã«ä¿åããã | éåä»¶æ°ãšããŠèšé²ããã |
expect_or_drop |
ç Žæ£ãããïŒã©ãã«ãä¿åãããªãïŒ | ç Žæ£ä»¶æ°ãšããŠèšé²ããã |
expect_or_fail |
ä¿åãããªãïŒåŠçèªäœã倱æããïŒ | 倱æã®åå ãšããŠèšé²ããã |
| QuarantineïŒéé¢ïŒãã¿ãŒã³ | ãéé¢çšããŒãã«ãã«ä¿åããã | ããžãã¯ãšããŠå®è£ ãå¿ èŠ |
expect ã®å Žåã®ããŒã¿ã®èŠãæ¹
expect ãèšå®ããããŒãã«ãã¯ãšãªãããšãå¶çŽã«éåããã¬ã³ãŒããæ£åžžãªã¬ã³ãŒããšæ··ãã£ãŠè¡šç€ºãããŸãã
- ç¢ºèªæ¹æ³: DLTãã€ãã©ã€ã³ã®UIäžã«ãããData Qualityãã¿ãã§ãäœããŒã»ã³ãã®ã¬ã³ãŒããéåãããã®çµ±èšã確èªã§ããŸãã
- çšé: ããšããããããŒã¿ã¯å
šéšå
¥ããŠãããŠãåŸã§
WHEREå¥ã§é€å€ããŠåæããããšãã£ãéçšã«é©ããŠããŸãã
2. äžæ£ãªããŒã¿ããå¥ããŒãã«ãã«éãããå ŽåïŒé颿Šç¥ïŒ
詊éšãå®åã§ãäžæ£ããŒã¿ãæšãŠãããªãããã¯ãªãŒã³ãªããŒãã«ã«ãå ¥ããããªãããšããèŠä»¶ãããå ŽåãDLTã§ã¯2ã€ã®ããŒãã«ã«åå²ãããããžãã¯ãæåã§æžãå¿ èŠããããŸãã
- ã¯ãªãŒã³çšããŒãã«:
expect_or_dropã䜿çšããŠãæ£ããããŒã¿ã®ã¿ãä¿æã - éé¢ïŒQuarantineïŒçšããŒãã«: éã«ãäžæ£ãªæ¡ä»¶ãã
expectïŒãŸãã¯ãã£ã«ã¿ïŒã§æœåºãããšã©ãŒèª¿æ»çšã«ä¿æã
expectãexpect_or_dropã®ããŒã¿ã詳现ããŒã¿ã£ãŠã©ããªæãã§ã¢ã¯ã»ã¹ããïŒ
3. ã€ãã³ããã°ã§ã®ç¢ºèª
ãã©ã®ã¬ã³ãŒãããã¡ã ã£ãããã®çµ±èšæ å ±ã¯ãDatabricksãèªåã§ç®¡çããã€ãã³ããã°ïŒEvent LogïŒãšããç¹å¥ãªå Žæã«ä¿åãããŸãã
-- ã€ãã³ããã°ããããŒã¿å質ã¡ããªã¯ã¹ãã¯ãšãªããäŸ
SELECT
id,
expectations.dataset,
expectations.name AS constraint_name,
expectations.passed_records,
expectations.failed_records
FROM (
SELECT
id,
explode(from_json(details:flow_progress:data_quality:expectations, 'array<struct<dataset:string, name:string, passed_records:int, failed_records:int>>')) AS expectations
FROM event_log
WHERE event_type = 'flow_progress'
)
äŸïŒ
| id (å®è¡ID) | dataset (ããŒãã«å) | constraint_name (å¶çŽå) | passed_records (æåæ°) | failed_records (å€±ææ°) |
|---|---|---|---|---|
a1b2-c3d4... |
raw_sales |
valid_id |
1000 |
0 |
a1b2-c3d4... |
raw_sales |
positive_price |
995 |
5 |
e5f6-g7h8... |
clean_users |
email_not_null |
450 |
2 |
Windowãšwithå¥ã®é¢ä¿æ§
1. WITH å¥ïŒCommon Table Expression / CTEïŒã®æå³åã
WITH å¥ã¯ãã¡ã€ã³ã®ã¯ãšãªãå®è¡ããåã«ãä»®ã®ããŒãã«ããå®çŸ©ãããããªã€ã¡ãŒãžã§ãã
- æå³åã: ããã®è€éãªèšç®çµæã
temp_tableãšããååã§åŒã¶ããšã«ããããšãã宣èšã - ã¡ãªãã: ãµãã¯ãšãªïŒå ¥ãåæ§é ïŒãæé€ã§ãããããäžããäžã«æµããããã«ã¯ãšãªãèªããããã«ãªããŸãã
2. WINDOW 颿°ã®æå³åã
WINDOW 颿°ã¯ãéèšïŒGROUP BYïŒãšç°ãªãããå
ã®è¡ãç¶æãããŸãŸãããã®è¡ã«é¢é£ããåšå²ã®ããŒã¿ãåç
§ããŠèšç®ããŸãã
- æå³åã: ãçŸåšã®è¡ã®ååŸ 3 è¡ã®å¹³åãåºãããããéšçœ²å ã§ã®èªåã®é äœãç¥ãããããšãã£ãèšç®ã
- ã¡ãªãã:
JOINãç¹°ãè¿ããªããŠããè¡ã¬ãã«ã®ããŒã¿ãšéèšå€ã暪䞊ã³ã«ã§ããŸãã
3. WITH å¥ãš WINDOW 颿°ã䜵çšããå
·äœäŸ
å®åã詊éšã§æãå€ããã¿ãŒã³ã¯ããWITH å¥ã®äžã§ WINDOW 颿°ã䜿ã£ãŠé äœãä»ããã¡ã€ã³ã¯ãšãªã§ãã®é äœã䜿ã£ãŠãã£ã«ã¿ãªã³ã°ããããšããæµãã§ãã
SQL ã§ã®èšè¿°äŸ
ãåéšçœ²ã§çµŠäžãé«ãããã 3 人ã ããæœåºãããããšããå ŽåïŒ
-- 1. WITHå¥ã§ã©ã³ãã³ã°ä»ãã®ä»®ããŒãã«ãäœã
WITH ranked_employees AS (
SELECT
name,
dept,
salary,
-- WINDOW颿°ã§éšçœ²ããšã®é äœãèšç®
DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rank
FROM employees
)
-- 2. ã¡ã€ã³ã¯ãšãªã§é äœã䜿ã£ãŠçµã蟌ã
SELECT * FROM ranked_employees
WHERE rank <= 3;
ãªã䜵çšãå¿ èŠãªã®ãïŒ
SQL ã®å®è¡é åºã®ã«ãŒã«äžãWHERE å¥ã®äžã§ WINDOW 颿°ã®çµæïŒäžèšã® rankïŒãçŽæ¥äœ¿ãããšã¯ã§ããŸããã ãã®ãããäžåºŠ WITH å¥ã§ãé äœãšããåãæã£ãããŒãã«ãã確å®ãããŠãããå€åŽã§ãã£ã«ã¿ãªã³ã°ããå¿
èŠãããã®ã§ãã
| æ©èœ | åœ¹å² | äž»ãªç®ç | 詊éšã§ã®ããŒã¯ãŒã |
|---|---|---|---|
| WITH å¥ (CTE) | äžæçãªçµæã»ããã®å®çŸ© | è€éãªã¯ãšãªã®åå²ã»å¯èªæ§åäž | åå©çšãã¯ãšãªã®æŽçããã¹ãã®åé¿ |
| WINDOW 颿° | è¡ã®ãæ ïŒçªïŒãã®äžã§ã®èšç® | ç§»åå¹³åãã©ã³ãã³ã°ã环èšã®ç®åº | OVER, PARTITION BY, ORDER BY |
å ·äœçãªéã
1. SELECT ã§ïŒãµãã¯ãšãªã䜿ã£ãŠïŒå§ããå Žå
äžæ°ã«çµæãåºãããšããã¢ãããŒãã§ããå åŽããå€åŽãžèªã¿è§£ãå¿ èŠããããŸãã
SELECT name, dept, salary
FROM (
-- ããã§äžåºŠã©ã³ãã³ã°ä»ãã®ãäžéç¶æ
ããäœã
SELECT name, dept, salary,
RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rnk
FROM employees
)
WHERE rnk = 1;
ããŒãã«ã®æšç§»ã€ã¡ãŒãž
- å
ããŒã¿ (
employees): ååãéšçœ²ã絊äžã䞊ãã§ããã - ãµãã¯ãšãªå
: åè¡ã®æšªã«
rnkåãèšç®ãããŠä»äžãããã - æçµçµæ: å€åŽã®
WHERE rnk = 1ã«ãã£ãŠã1äœä»¥å€ã®è¡ãæ¶ããã
2. WITH å¥ã§å§ããå Žå
ããŸãã©ã³ãã³ã°è¡šãäœãããæ¬¡ã«ããããæœåºããããšããã¬ã·ãã®ãããªæé ã§æžãã¢ãããŒãã§ãã
- STEP 1: ã©ã³ãã³ã°ä»ãã®ãä»®æ³ããŒãã«ããå®çŸ© WITH RankedTable AS ( SELECT name, dept, salary, RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rnk FROM employees ) - STEP 2: äœæããä»®æ³ããŒãã«ãã1äœãæœåºSELECT name, dept, salary FROM RankedTable WHERE rnk = 1;
STEP 1: [RankedTable] ã®äœæïŒã¡ã¢ãªäžã®ä»®æ³ããŒãã«ïŒ
| name | dept | salary | rnk (WINDOW颿°ã®çµæ) |
|---|---|---|---|
| ç°äž | å¶æ¥ | 500,000 | 1 |
| äœè€ | å¶æ¥ | 450,000 | 2 |
| éŽæš | éçº | 600,000 | 1 |
| 髿© | éçº | 580,000 | 2 |
STEP 2: [æçµåºå]ïŒWHERE rnk = 1 ã§ãã£ã«ã¿ïŒ
| name | dept | salary |
|---|---|---|
| ç°äž | å¶æ¥ | 500,000 |
| éŽæš | éçº | 600,000 |
hash(email)ã£ãŠååšããïŒEmailãhashåããŠãinputã«å¯Ÿå¿ããoutputãäœãããå Žåã¯ã©ããªé¢æ°ãæãŸããïŒ
hashã¯æå·åã®é¢æ°ã§ã¯ãªã
以äžãåè£
| ææ³ | 颿°å (SQL/Python) | ç¹åŸŽã»ã¡ãªãã | é©ãããŠãŒã¹ã±ãŒã¹ |
|---|---|---|---|
| æšæºçãªããã·ã¥ | sha2(email, 256) |
é«ãäžè²«æ§ãšè¡çªèæ§ãæ¥çæšæºã | æ°žç¶çãªå¿åIDã®äœæãããŒãã«çµåã㌠|
| é«éãªããã·ã¥ | md5(email) |
åŠçãéãããã»ãã¥ãªãã£åŒ·åºŠã¯äœãã | éè€ãã§ãã¯ããã£ãã·ã¥ã㌠|
| æå·åïŒå¯éïŒ | aes_encrypt |
éµãããã°å ã®Emailã埩å ã§ããã | ç£æ»ããã©ãã«æã«å ããŒã¿ãå¿ èŠãªå Žå |
| ãã¹ãã³ã° | mask(email) |
äžéšã * ã§é ããããã·ã¥ã§ã¯ãªãã |
ããã·ã¥ããŒãã§ã®å±ç€ºçš |
æå€§éã®ã»ãã¥ãªãã£æäŸã¯OAuthãšãµãŒãã¹ããªã³ã·ãã«ïŒ
ãã®éãã§ããã»ãã¥ãªãã£ãšç®¡çã®èгç¹ãããPATã§ã¯ãªããµãŒãã¹ããªã³ã·ãã«ã®å©çšãæšå¥šãããŸã
ãã³ãã«ã§äœã£ããžã§ããã§ããŠãå Žåãrunã³ãã³ãã§å®è¡å¯èœïŒtãªãã·ã§ã³ prodã䜿ãïŒ
çµè«ããèšããšãdatabricks bundle run ã³ãã³ãã§å®è¡å¯èœã§ãããŸããã質åã®éããã¿ãŒã²ããïŒç°å¢ïŒãæå®ããããã« -tïŒãŸã㯠--targetïŒãªãã·ã§ã³ã䜿çšããã®ãæšæºçãªæé ã§ãã
databricks bundle run ã®ä»æ§ãšãªãã·ã§ã³
| ã³ãã³ã / ãªãã·ã§ã³ | åœ¹å² | å®è¡äŸ |
|---|---|---|
databricks bundle run |
ãã³ãã«ã§å®çŸ©ããããžã§ããå³åº§ã«å®è¡ãã | databricks bundle run [job_key] |
-t (ãŸã㯠--target) |
databricks.yml ã§å®çŸ©ããç°å¢ïŒdev, prodçïŒãæå®ãã |
-t prod |
job_key |
resources/jobs å
ã§å®çŸ©ãããžã§ãã®èå¥å |
my_sample_job |
--refresh-all |
ãã€ãã©ã€ã³(DLT)ãªã©ã®ç¶æ ããªãã¬ãã·ã¥ããŠå®è¡ãã | --refresh-all |
ãããããªãã® databricks.yml ã« prod ãšããã¿ãŒã²ãããšãmain_job ãšãããžã§ããå®çŸ©ãããŠããå Žåãã³ãã³ãã¯ä»¥äžã®ããã«ãªããŸãã
# prodç°å¢ã®ãžã§ããå®è¡ããå Žå databricks bundle run -t prod main_job`
ã¯ãŒã¯ãããŒã§éäžã®ã¿ã¹ã¯ã«åæã«å¿ èŠãã©ã¡ãŒã¿è¿œå ãããŠãããå Žåãã©ãããåå®è¡ã¹ã¿ã€ã«ãæãŸããïŒ
Databricksã®ãžã§ãïŒã¯ãŒã¯ãããŒïŒãéäžã®ã¿ã¹ã¯ã§å€±æããç¹ã«ãæå³ããªããã©ã¡ãŒã¿è¿œå ããªã©ã®æ§æãã¹ãåå ã ã£ãå Žåãæãå¹ççã§æãŸããåå®è¡ã¹ã¿ã€ã«ã¯ ã倱æããã¿ã¹ã¯ããã®åå®è¡ïŒRepair RunïŒã ã§ãã
| åå®è¡ã¹ã¿ã€ã« | å 容 | ã¡ãªãã | ãã¡ãªãã |
|---|---|---|---|
| Repair Run (倱æããã¿ã¹ã¯ããå詊è¡) | 倱æããã¿ã¹ã¯ãšããã®äžæµã®ã¿ã¹ã¯ã®ã¿ãå®è¡ãã | æéãšèšç®ãªãœãŒã¹ã®ç¯çŽãæåæžã¿ã®ã¿ã¹ã¯ãç¹°ãè¿ããªã | 倱æåå ãäžæµã®ããŒã¿äžåã«ããå Žåã¯è§£æ±ºããªã |
| Run Now (ãã¹ãŠåå®è¡) | ã¯ãŒã¯ãããŒãæåããããçŽã | ããŒã¿ã®æŽåæ§ãå®å šã«ä¿èšŒããã | æåããŠããéãåŠçïŒBronzeâSilverçïŒãå床åãããã³ã¹ããé«ã |
| æåããŒãããã¯å®è¡ | åå¥ã®ããŒãããã¯ãéããŠä¿®æ£ã»å®è¡ | èªç±ãªãããã°ãå¯èœ | ãžã§ãã®äŸåé¢ä¿ããã©ã¡ãŒã¿ã®åŒãç¶ããåçŸã§ããªã |
ãã®å Žåã®æãŸãã察å¿ãããŒ
ããã©ã¡ãŒã¿èšå®ãã¹ãã§ã³ã±ãå Žåã以äžã®æé ããã¹ããã©ã¯ãã£ã¹ã§ãã
- æ§æã®ä¿®æ£: ãŸãããžã§ãã®èšå®ïŒãããã¯Bundlesã®ãœãŒã¹ã³ãŒãïŒãããåé¡ãšãªã£ãäœèšãªãã©ã¡ãŒã¿ãåé€ã»ä¿®æ£ããŸãã
- Repair Runãéžæ:
- Databricks UIã®ãžã§ãå®è¡è©³çްç»é¢ã§ ãRepair RunïŒä¿®åŸ©å®è¡ïŒã ãã¯ãªãã¯ããŸãã
- ããã«ããããä¿®æ£åŸã®èšå®ãã䜿ã£ãŠã倱æããã¿ã¹ã¯ããåŠçãåéãããŸãã
- éšåçãªãã©ã¡ãŒã¿äžæžã (å¿
èŠã«å¿ããŠ):
- åå®è¡æã«ããã®åã ãã®ç¹å®ã®ãã©ã¡ãŒã¿ãæž¡ããŠã修埩ãããããšãå¯èœã§ãã
ãªããRepair RunããæãŸããã®ãïŒ
Databricksã®ã¯ãŒã¯ãããŒã¯ãåã¿ã¹ã¯ãæåãããšããç¶æ ãä¿æããŠããŸãã
- å¹ç: æ¢ã«å®äºããŠããäžæµã¿ã¹ã¯ïŒäŸïŒå€§èŠæš¡ãªããŒã¿ããŒãïŒãã¹ãããã§ããããã埩æ§ãŸã§ã®æéãæçã«ãªããŸãã
- ã¹ãçæ§: ããŒã¿ãšã³ãžãã¢ãªã³ã°ã§ã¯ãäºéåã蟌ã¿ïŒDouble IngestionïŒãé²ãããã«ãäžåºŠæåããã¿ã¹ã¯ã¯ç¹°ãè¿ããªãã®ãåºæ¬ã§ãã
ã¿ã¹ã¯ãšãžã§ãã¯ã©ã£ã¡ãã§ãããããïŒ
ããžã§ãïŒJobïŒãã®æ¹ã倧ããªãããã§ãã
1ã€ã®ããžã§ãããšãã倧ããªç®±ã®äžã«ãå®è¡ãããåå¥ã®åŠçã§ãããã¿ã¹ã¯ïŒTaskïŒããè€æ°å ¥ã£ãŠããããšããæ§é ã«ãªã£ãŠããŸãã
ãžã§ããšã¿ã¹ã¯ã®éå±€æ§é
| åäœ | åœ¹å² | å ·äœäŸ |
|---|---|---|
| ãžã§ã (Job) | ã¯ãŒã¯ãããŒå šäœã®ç®¡çåäœãã¹ã±ãžã¥ãŒã«ãéç¥ã管çããã | ãæ¥æ¬¡å£²äžéèšãã€ãã©ã€ã³ã |
| ã¿ã¹ã¯ (Task) | ãžã§ãå ã§è¡ãããåå¥ã®å®è¡ã¹ãããã | ãããŒã¿ã®æœåºãã倿ããã¢ãã«ã®æšè«ã |
ãžã§ãã®å®è¡ã³ãã³ãã¯run ? execute?
Databricks CLIãAPIã«ãããŠããžã§ããåããéã®æ£åŒãªåè©ïŒã³ãã³ãïŒã¯ run ã§ãã
execute ãšããã³ãã³ãã¯ååšããŸãããDatabricks Asset Bundles (DABs) ã§ããéåžžã® CLI ã§ã run ã䜿çšããŸãã
ãžã§ãå®è¡ã«é¢ããã³ãã³ãäœç³»
| ã³ã³ããã¹ã | å®è¡ã³ãã³ã | çšé |
|---|---|---|
| DABs (ãã³ãã«) | databricks bundle run |
ãã³ãã«ã§ãããã€ãããžã§ããèµ·åãã |
| Databricks CLI | databricks jobs run-now |
æ¢åã®ãžã§ãïŒJob IDæå®ïŒãå³åº§ã«å®è¡ãã |
| REST API | POST /api/2.1/jobs/run-now |
å€éšã·ã¹ãã ãããžã§ãå®è¡ããªã¯ãšã¹ããã |
| SQL (åè) | EXECUTE ... |
SQLã®ã¹ãã¢ãããã·ãŒãžã£çãåŒã¶éã«äœ¿ããããžã§ãã«ã¯äœ¿ããªã |
ãã³ãã«ãå®çŸ©ããyamlã«é¢ããŠãpermissionãšjobã®éå±€ã¯ã©ããªæãã«ãªãïŒ
resources:
jobs:
my_analysis_job: # ãžã§ãã®èå¥å
name: "Daily Analysis Job"
tasks:
- task_key: "run_notebook"
notebook_task:
notebook_path: "./notebook.ipynb"
# ããã permissions ã®éå±€
permissions:
- group_name: "data-engineers"
level: "CAN_MANAGE"
- group_name: "analysts"
level: "CAN_VIEW"
Saltingã¯ã©ãããæäœã§ãã©ãããæã«æå¹ïŒ
DatabricksïŒApache SparkïŒã«ãããSaltingïŒãœã«ãã£ã³ã°ïŒã¯ãããŒã¿ã®åãïŒããŒã¿ã¹ãã¥ãŒ / Data SkewïŒãè§£æ¶ãã䞊ååŠçã®ããã«ããã¯ãæç Žããããã®é«åºŠãªãã¥ãŒãã³ã°ææ³ã§ãã
Salting ã®æŠèŠ
| é ç® | å 容 |
|---|---|
| äž»ãªç®ç | ããŒã¿ã¹ãã¥ãŒïŒç¹å®ã®ããŒãã£ã·ã§ã³ãžã®ããŒã¿éäžïŒã®è§£æ¶ |
| æäœå 容 | çµåããŒçã«ã©ã³ãã ãªæ°å€ïŒ1ãNïŒãä»å ããŠããŒã现ååãã |
| æå¹ãªå Žé¢ | ç¹å®ã®IDïŒäŸïŒNullãããã©ã«ãå€ã巚倧顧客IDïŒã«ããåŠçé å»¶æ |
| ãã¬ãŒããªã | å°ããæ¹ã®ããŒãã«ïŒDimensionããŒãã«ãªã©ïŒããä»äžããå¡©ïŒSaltïŒã®æ°ã ãè€è£œïŒExplodeïŒããŠæ¡åŒµããå¿ èŠããã |
1. ã©ããªæã«æå¹ãïŒïŒã¹ãã¥ãŒã®æ€ç¥ïŒ
以äžã®ãããªçç¶ãåºãŠããæã«éåžžã«æå¹ã§ãã
- 99%ã®ã¿ã¹ã¯ã¯æ°ç§ã§çµããã®ã«ãæåŸã®1ã€ã®ã¿ã¹ã¯ã ããæ°åãæ°æéçµãããªãã
- Spark UI ã®ãExecutorãã¿ãã§ãç¹å®ã®ã³ã¢ã ãããã£ãšçšŒåããŠãããä»ãéãã§ããã
- ç¹å®ã®ããŒïŒäŸïŒ
user_id = 0ãcountry = 'US'ïŒã®ã¬ã³ãŒãæ°ãæ°åäžä»¶ããã
Saltingã¯ã©ãããã°è¡ããïŒ
以äžã®ãããªåœ¢
from pyspark.sql import functions as F
# å¡©ã®ç¯å²ïŒããŒãã£ã·ã§ã³ãããã€ã«ãã©ããããïŒ
salt_bins = 5
# ããŒãã«A: çµåããŒã« 1ã5 ã®ã©ã³ãã ãªæ°å€ãä»å
df_skewed = df_a.withColumn(
"salted_key",
F.concat(F.col("user_id"), F.lit("_"), F.expr(f"int(rand() * {salt_bins})"))
)
æ§ã ãªãã£ã«ã¿ãŒæ¡ä»¶ã䜿ãããå ŽåãDeltaããHiveã®æ¹ãè¯ãã±ãŒã¹ãããïŒ
çµè«ããèšããšãçŸä»£ã®Databricksç°å¢ã«ãããŠãHiveã®æ¹ãè¯ãã±ãŒã¹ãã¯ã»ãŒååšããŸããã
ç¹ã«ãæ§ã ãªãã£ã«ã¿ãŒæ¡ä»¶ïŒWHEREå¥ïŒãã䜿ãããã¢ãããã¯ãªã¯ãšãªãè€éãªã¯ãŒã¯ããŒãã«ãããŠã¯ãDelta Lakeã®æ¹ãå§åçã«æå©ã§ãã
詊éšå¯ŸçãšããŠããªãHiveïŒæšæºçãªParquetããŒãã«ïŒã§ã¯ãªãDeltaãéžã°ããã®ãããã£ã«ã¿ãŒæ§èœã®èгç¹ããæ¯èŒè¡šã«ãŸãšããŸããã
Delta Lake vs Hive (Parquet) ã®ãã£ã«ã¿ãŒæ§èœæ¯èŒ
| æ©èœ | Hive (åŸæ¥) | Delta Lake (æšå¥š) | ãã£ã«ã¿ãŒãžã®åœ±é¿ |
|---|---|---|---|
| ããŒã¿ã¹ããã | ããŒãã£ã·ã§ã³åäœã®ã¿ | ãã¡ã€ã«å ã®çµ±èšæ å ±ïŒmin/maxïŒåäœ | èªã¿èŸŒãããŒã¿éãåçã«åæž |
| Z-Order / Liquid | ãµããŒããªã | ãã«ãµããŒã | è€æ°ã«ã©ã ã§ã®çµã蟌ã¿ãé«éå |
| çµ±èšæ å ±ã®ç®¡ç | æåã§ ANALYZE ãå¿
èŠ |
æžãèŸŒã¿æã«èªååé | åžžã«æé©ãªã¯ãšãªãã©ã³ãäœæããã |
| ãã¡ã€ã«ãµã€ãºç®¡ç | å°ããªãã¡ã€ã«ãæºãŸãããã | OPTIMIZE ã§èªåçµå |
I/Oå¹çãäžãããã¹ãã£ã³ãéããªã |
| ã€ã³ããã¯ã¹ | åºæ¬ãªãïŒããŒãã£ã·ã§ã³ã®ã¿ïŒ | ãã«ãŒã ãã£ã«ã¿ã»ã€ã³ããã¯ã¹ | ç¹å®ã®å€ãæ¢ãé床ãåäž |
2. 倿¬¡å çãªæé©å (Z-Order / Liquid Clustering)
Hiveã®ããŒãã£ã·ã§ã³ã¯éåžž1ã€ã2ã€ã®ã«ã©ã ã«éãããŸãã
- Deltaã®å Žå: Liquid Clustering ã䜿ãã°ã倿°ã®ã«ã©ã ã«å¯ŸããŠæè»ã«ã¯ã©ã¹ã¿ãªã³ã°ããããããŸãããŠãŒã¶ãŒããæ¥ä»ã§çµããã顧客IDã§çµãããååã«ããŽãªã§çµãããšãã£ããã©ãã©ãªæ¡ä»¶ã§æ€çŽ¢ããŠããã©ã®ãã¿ãŒã³ã§ãé«éã«å¿çã§ããŸãã
3. ãã«ãŒã ãã£ã«ã¿ã»ã€ã³ããã¯ã¹ (Bloom Filter Index)
ç¹å®ã®é«ã«ãŒãã£ããªãã£ãªã«ã©ã ïŒäŸïŒIDãã·ãªã¢ã«çªå·ïŒã§å®å
šäžèŽæ€çŽ¢ïŒ=ïŒãå€çšããå ŽåãDeltaã§ã¯ãã«ãŒã ãã£ã«ã¿ãäœæã§ããŸããããã«ããããã®ãã¡ã€ã«ã«æ¢ããŠããå€ãå«ãŸããŠããªãããšããç¬æã«å€æã§ããŸãã
Liquid Clusteringã£ãŠèšæ¶ã§ã¯ããŒããŒã·ã§ã³ããŒãšãœãŒãããŒã¯åºå®ããªããšäœ¿ããªãã¯ãã ããããã§ã©ããã£ãŠæè»ãªæ€çŽ¢ãå¯èœã«ãªãïŒ
å®ã¯ãLiquid Clusteringã®æå€§ã®æŠåšã¯ãããŒãã£ã·ã§ã³ããŒãšããæŠå¿µãæšãŠãã«ã©ã ïŒããŒïŒãåŸããèªç±ã«å ¥ãæ¿ããããæè»æ§ãã«ãããŸãã
ãªããåºå®ãã§ã¯ãªããæè»ããªã®ãïŒ
åŸæ¥ã®æ¹åŒïŒHiveããŒãã£ã·ã§ãã³ã°ãZ-OrderïŒãšæ¯èŒããŠãLiquid Clusteringããªãæè»ãšèšãããã®ãããã®çç±ãæŽçããŸãã
| ç¹åŸŽ | åŸæ¥ã®ããŒãã£ã·ã§ãã³ã° / Z-Order | Liquid Clustering (ææ°) |
|---|---|---|
| ããŒã®åºå® | ããŒãã«äœææã«åºå®ã倿Žã«ã¯ããŒãã«åäœæãå¿ èŠ | ãã€ã§ã倿Žå¯èœã æ¢åããŒã¿ãæžãçŽããã«ããŒãå ¥ãæ¿ãããã |
| äžŠã¹æ¿ãã®è»ž | Z-Orderã¯äžåºŠã«æå®ããã«ã©ã ã§ç©çåºå®ããã | ããŒã¿ã®ååžã«åãããŠãå¢åçãã«ã¯ã©ã¹ã¿ãªã³ã°ãæé©åãã |
| æ€çŽ¢ã®æè»æ§ | æå®ããããŒä»¥å€ã§ã®æ€çŽ¢ã¯ãã«ã¹ãã£ã³ã«ãªããã¡ | ã¯ã©ã¹ã¿ãªã³ã°ããŒã«å«ãŸããã©ã®ã«ã©ã ã®çµã¿åããã§ãé«é |
| ã«ãŒãã£ããªã㣠| é«ããããšãã¹ã¢ãŒã«ãã¡ã€ã«åé¡ããçºçãã | é«ã«ãŒãã£ããªãã£ïŒIDçïŒã§ãèªåã§ãµã€ãºèª¿æŽããã |
Liquid Clusteringã§ã¯ã以äžã®ã³ãã³ãã§éçšäžã«ããŒã倿Žã§ããŸãã
- ããŒãã«äœæåŸããã£ã±ãå¥ã®ã«ã©ã ã§æé©åããããªã£ãå Žå ALTER TABLE table_name CLUSTER BY (new_column_1, new_column_2);
ãã®ã³ãã³ããæã£ãåŸã® OPTIMIZE å®è¡æãããæ°ããããŒã«åºã¥ããŠããŒã¿ãåé
眮ããå§ããŸãããããŒãã«å®çŸ©ãå£ããã«ãããžãã¹èŠä»¶ã«åãããŠæé©åã®è»žãå€ãããããã®ãæå€§ã®æè»æ§ã§ãã
2. ã倿¬¡å ãã§ã®é«éæ€çŽ¢
Z-Orderã倿¬¡å æ€çŽ¢ã«åŒ·ãã§ãããããŒã¿ãå¢ããã»ã©ãäžŠã¹æ¿ãã³ã¹ãããææ°é¢æ°çã«å¢å€§ããŸãã
Liquid Clusteringã¯ãããã«ãã«ãæ²ç·ããªã©ã®é«åºŠãªã¢ã«ãŽãªãºã ãå éšã§å©çšããè€æ°ã®ã«ã©ã ïŒæå€§4ã€æšå¥šïŒã®çžé¢é¢ä¿ãä¿ã£ããŸãŸããŒã¿ãç©ççã«ãŸãšããŸããããã«ããããæ¥ä»ã®ã¿ããIDã®ã¿ããæ¥ä»ïŒIDããšãã£ããã©ã®ãã¿ãŒã³ã®ãã£ã«ã¿ãŒæ¡ä»¶ãæ¥ãŠããé¢é£ããããŒã¿ãããã¯ã ããç確ã«çãæã¡ïŒData SkippingïŒã§ããŸãã
3. ããŒãã£ã·ã§ã³ã®ã现ååãããã®è§£æŸ
ãæ¥ä»ãã§ããŒãã£ã·ã§ã³ãåããšã1æ¥ã®ããŒã¿ãå°ãªãå Žåã«ãã¡ã€ã«ã现ãããªããããŠæ§èœãèœã¡ãŸããã
Liquid Clusteringã¯ãç©ççãªãã£ã¬ã¯ããªæ§é ãã«äŸåããªããããããŒã¿éã«å¿ããŠèªåã§ã¯ã©ã¹ã¿ãŒã®ãµã€ãºã調æŽããŸããããã«ãããèšèšè
ããã©ã®ç²åºŠã§ããŒãã£ã·ã§ã³ãåãã¹ããããšæ©ãå¿
èŠããªããªããŸããã
詊éšå¯Ÿçã®ãã€ã³ã
詊éšã§ãLiquid Clusteringã®å©ç¹ããåããããã以äžã®ããŒã¯ãŒããæ£è§£ã«çŽçµããŸãã
- No Partition Evolution required: ããŒãã£ã·ã§ã³ã®åå®çŸ©ïŒãã€ã°ã¬ãŒã·ã§ã³ïŒãäžèŠã
- Support high-cardinality columns: 顧客IDã®ãããªãŠããŒã¯ãªå€ãå€ãã«ã©ã ã§ãæ§èœå£åããªãã
- Incremental Clustering:
OPTIMIZEãå®è¡ãããã³ã«ãæ°ããå ¥ã£ãŠããããŒã¿ã ããè³¢ãã¯ã©ã¹ã¿ãªã³ã°ããŠãããã
Delta Live TablesãšStructured Streamingã®ã¡ãªãã¡è¡š
äŸãããªãã°ãStructured Streamingã¯ããšã³ãžã³ïŒéšåïŒãã§ãããDLTã¯ãã®ãšã³ãžã³ãç©ãã ãèªåé転è»ïŒãã¬ãŒã ã¯ãŒã¯ïŒãã§ãã
DLT vs Structured Streaming æ¯èŒè¡š
| æ¯èŒé ç® | Structured Streaming (äœã¬ãã«API) | Delta Live Tables (é«ã¬ãã«æ§æ) |
|---|---|---|
| äž»ãªåœ¹å² | ãªã¢ã«ã¿ã€ã ã»å¢åããŒã¿åŠçã®èšè¿° | ãã€ãã©ã€ã³å šäœã®ç®¡çã»éçšã»å質ä¿èšŒ |
| éçºèšèª | Python, Scala, Java, SQL | Python, SQL |
| ã€ã³ãã©ç®¡ç | ã¯ã©ã¹ã¿ãŒã®æ§æã»èµ·åãæåã§ç®¡ç | ã€ã³ãã©ãèªåã¹ã±ãŒã«ã»èªå管ç (Serverless) |
| äŸåé¢ä¿ | åãžã§ãã®å®è¡é åºãå€éš(Workflowç)ã§å¶åŸ¡ | ããŒãã«éã®äŸåé¢ä¿ãã°ã©ã(DAG)ã§èªå解決 |
| ããŒã¿å質 | ãŠãŒã¶ãŒããã§ãã¯ããžãã¯ãå®è£ | Expectationsæ©èœã§å®£èšçã«å質ãå®çŸ© |
| ã¢ãã¿ãªã³ã° | Spark UI ããã°ã§åå¥ã«ç¢ºèª | çµ±åUIã§ãªããŒãžãããŒã¿å質ãå¯èŠå |
| ã³ã¹ã | äœãïŒèªç±åºŠãé«ãåã管çã³ã¹ãå¢ïŒ | é«ãïŒDBUã«å ãç®¡çæ©èœã®ã³ã¹ãïŒ |
1. Structured Streaming ã®ã¡ãªããã»ãã¡ãªãã
- ã¡ãªãã:
- ç©¶æ¥µã®æè»æ§: ããªã¬ãŒã®ééããã§ãã¯ãã€ã³ãã®å Žæãç¶æ 管çïŒStateful processingïŒã现ããå¶åŸ¡ã§ããã
- æ¢åã³ãŒãã®æµçš: çŽç²ãªSparkã³ãŒããªã®ã§ãæ¢åã®ã©ã€ãã©ãªãåŠçããžãã¯ããã®ãŸãŸçµã¿èŸŒã¿ãããã
- ãã¡ãªãã:
- éçšè² è·: ãã§ãã¯ãã€ã³ãã®ç®¡çããã¯ã©ã¹ã¿ãŒãèœã¡ãéã®åèµ·åãã¹ããŒãé²åã®å¯Ÿå¿ãªã©ãèªåã§è¡ãå¿ èŠãããã
- å¯èŠæ§ã®æ¬ åŠ: ããŒã¿ã®æµãïŒãªããŒãžïŒã远ãã®ãé£ããããã€ãã©ã€ã³å šäœã®ç¶æ ææ¡ã«æéããããã
2. Delta Live Tables (DLT) ã®ã¡ãªããã»ãã¡ãªãã
- ã¡ãªãã:
- 宣èšçéçº: ãã©ãåããããã§ã¯ãªããã©ããªããŒãã«ãäœããããããæžãã ãã§ãã·ã¹ãã ãæé©åããŠãããã
- ããŒã¿å質 (Expectations):
expect_or_dropãªã©ã䜿ã£ãŠãäžæ£ããŒã¿ãèªåã§é€å€ã»ç£èŠã§ããã - èªåå: ã¯ã©ã¹ã¿ãŒã®ãµã€ãºèª¿æŽïŒEnhanced AutoscalingïŒãããšã©ãŒæã®èªåãªãã©ã€ãæšæºæèŒã
- ãã¡ãªãã:
- å¶çŽ: DLTã®ãã¬ãŒã ã¯ãŒã¯å ã§èš±å¯ãããŠããªãæäœïŒç¹å®ã®ã©ã€ãã©ãªã®äœ¿çšãè€éãªI/OïŒãå¶éãããããšãããã
- åŠç¿ã³ã¹ã: DLTç¬èªã®æ§æïŒ
live.ãã¬ãã£ãã¯ã¹ãªã©ïŒãã©ã€ããµã€ã¯ã«ãçè§£ããå¿ èŠãããã
Delta Live TablesãšStructured Streamingã䜵çšããã³ãŒãäŸ
import dlt
from pyspark.sql import functions as F
# --- 1. Bronzeå±€: å€éšãœãŒã¹ãã Structured Streaming ã§åã蟌㿠---
# spark.readStream + format("cloudFiles") ã䜿çš
@dlt.table(
name="raw_game_logs",
comment="Auto Loaderã䜿çšããŠã¯ã©ãŠãã¹ãã¬ãŒãžããçãã°ãåã蟌ã"
)
def raw_game_logs():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/mnt/data/events/")
)
# --- 2. Silverå±€: DLTéã®ã¹ããªãŒãã³ã°æ¥ç¶ãšããŒã¿å質管ç ---
# dlt.read_stream ã䜿çšããŠãäžæµã®ããŒãã«ããå¢åãèªã¿åºã
@dlt.table(
name="cleaned_game_logs",
comment="äžæ£ãªã¹ã³ã¢ãé€å»ããã¿ã€ã ã¹ã¿ã³ããæ£èŠåãã"
)
@dlt.expect_or_drop("valid_score", "score >= 0") # ããŒã¿å質ã®å¶çŽ
def cleaned_game_logs():
return (
dlt.read_stream("raw_game_logs") # åã®ããŒãã«ãã¹ããªãŒã ãšããŠåç
§
.withColumn("event_timestamp", F.to_timestamp("event_time"))
.select("user_id", "event_type", "score", "event_timestamp")
)
# --- 3. Goldå±€: ãããªã¢ã©ã€ãºããã¥ãŒã«ããéèš ---
# éèšïŒWindow颿°ãªã©ïŒãè¡ããããããã§ã¯ã¹ããªãŒã ã§ã¯ãªãããŒãã«ãšããŠå®çŸ©
@dlt.table(
name="daily_score_summary",
comment="ãŠãŒã¶ãŒããšã®æ¥æ¬¡åèšã¹ã³ã¢"
)
def daily_score_summary():
return (
dlt.read("cleaned_game_logs") # éèšã®å Žå㯠read_stream ã§ã¯ãªã read ã䜿ãã®ãäžè¬ç
.groupBy("user_id", F.window("event_timestamp", "1 day"))
.agg(F.sum("score").alias("total_daily_score"))
)
| ç¹åŸŽ | ã¹ããªãŒãã³ã°ããŒãã« | ãããªã¢ã©ã€ãºããã¥ãŒ |
|---|---|---|
| å®çŸ©æ¹æ³ (Python) | dlt.read_stream(...) |
dlt.read(...) |
| SQLæ§æ | CREATE OR REFRESH STREAMING TABLE |
CREATE OR REFRESH LIVE TABLE |
| ããŒã¿ã®åŠç | ãå¢åãã®ã¿ãæ°ããå±ããããŒã¿ã ããæ¢åããŒãã«ã«è¿œå ïŒAppendïŒããã | ãææ°ç¶æ ããåæ ãåºã«ãªãããŒãã«å šäœãã¹ãã£ã³ããŠçµæãæŽæ°ããã |
| äž»ãªçšé | Bronze, Silverå±€ïŒå€§éããŒã¿ã®é«éãªåã蟌ã¿ãåçŽãªå€æïŒ | Goldå±€ïŒéèšãã©ã³ãã³ã°ãè€éãªJoinãææ°ãã¹ã¿ã®åæ ïŒ |
| éå»ããŒã¿ã®ä¿®æ£ | éå»åãèªåã§ä¿®æ£ããã®ã¯èŠæïŒåèšç®ãå¿ èŠïŒã | ãœãŒã¹ãå€ããã°ã次åã®æŽæ°æã«èšç®çµæãèªåçã«ä¿®æ£ãããã |
Bronze局㧠expectïŒèŠåã®ã¿ïŒãèšå®ãã€ã€ããã®éåã¬ã³ãŒããSilverå±€ã§ãåŸè¿œããã§é€å€ããããšããã·ããªãª
æãã¹ããŒããªæ¹æ³ã¯
ãBronzeã¯å
šä»¶å
¥ããŠãããSilverã®å®çŸ©ã§ expect_or_drop ã䜿ãã
ãšããæ§æã§ããããã«ãããBronzeã«ã¯èª¿æ»çšã®ããã¡ãªããŒã¿ããæ®ãã€ã€ãåæçšã®Silverã¯ã¯ãªãŒã³ã«ä¿ãŠãŸãã
| ã¬ã€ã€ãŒ | èšå®å 容 (Expectation) | ããŒã¿ã®ç¶æ | ç®ç |
|---|---|---|---|
| Bronze | expect |
å šã¬ã³ãŒãïŒéåå«ãïŒãä¿æ | ç£æ»ã»ãããã°ã»å ã®å§¿ã®ä¿å |
| Silver | expect_or_drop |
éåã¬ã³ãŒããèªåé€å€ | ä¿¡é Œã§ããåæçšããŒã¿ã®äœæ |
spark.readStream ãš dlt.read_stream ã®éã
| ç¹åŸŽ | spark.readStream |
dlt.read_stream |
|---|---|---|
| åç §å | å€éšãœãŒã¹ (S3, ADLS, Kafkaç) | DLTå ã®å¥ã®ããŒãã« (ã©ã€ãããŒãã«) |
| åœ¹å² | ãã€ãã©ã€ã³ã®ãå ¥ãå£ããå®çŸ©ãã | ãã€ãã©ã€ã³å ã®ãäŸåé¢ä¿ããæ§ç¯ãã |
| äŸåé¢ä¿ã®è§£æ±º | æç€ºçãªãã¹æå®ãå¿ èŠ | DLTãèªåã§ããŒãã«éã®é åºãèšç®ãã |
| äž»èŠãªçšé | Auto Loader (cloudFiles) ã§ã®ååèªèŸŒ | Bronze â Silver â Gold ã®ããŒã¿äŒæ¬ |
1. spark.readStreamïŒå€éšããã®ãåã蟌ã¿ã
spark.readStream ã¯ãæšæºç㪠Apache Spark ã®åœä»€ã§ããDLT ã®å€ã«ããããã¡ã€ã«ãããã¡ãã»ãŒãžãã¥ãŒãããããŒã¿ãåžãäžããæã«äœ¿ããŸãã
- ãã€äœ¿ãïŒ: ãã€ãã©ã€ã³ã®äžçªæåïŒé垞㯠Bronze å±€ïŒã§ãã¯ã©ãŠãã¹ãã¬ãŒãžäžã® RAW ããŒã¿ãèªã¿èŸŒããšãã
- ç¹åŸŽ:
cloudFiles(Auto Loader) ãªã©ã®ãã©ãŒããããæå®ããŠãå€éšãšã®æ¥ç¶ã確ç«ããŸãã
2. dlt.read_streamïŒå
éšã§ã®ãåç
§ã
dlt.read_stream ã¯ãDLT ãã¬ãŒã ã¯ãŒã¯å°çšã®ç¹å¥ãªé¢æ°ã§ãã
- ãã€äœ¿ãïŒ: ãã§ã« DLT å
ã§å®çŸ©ããå¥ã®ããŒãã«ïŒäŸïŒ
bronze_tableïŒãããæ¬¡ã®ããŒãã«ïŒäŸïŒsilver_tableïŒãžããŒã¿ãæµããããšãã - æå€§ã®ã¡ãªããïŒèªåDAGæ§ç¯ïŒ:
ãã®é¢æ°ã䜿ãããšã§ãDatabricks ã¯ãããŒãã« A ã¯ããŒãã« B ã®åã«å®è¡ããªããã°ãªããªãããšããäŸåé¢ä¿ïŒDAG: Directed Acyclic GraphïŒãèªåã§çè§£ããŸãã
cloudFiles (Auto Loader) ã£ãŠãªã«
| ç¹åŸŽ | å 容 |
|---|---|
| äž»ãªåœ¹å² | æ°çŸäžä»¶ã®ãã¡ã€ã«ãå¹ççã«ã¹ãã£ã³ããå¢åããŒã¿ã®ã¿ãåŠçãã |
| ã³ãŒãèšè¿° | spark.readStream.format("cloudFiles").load(path) |
| æ€ç¥ã¢ãŒã | ããã£ã¬ã¯ããªäžèЧïŒDirectory ListingïŒããšããã¡ã€ã«éç¥ïŒFile NotificationïŒã |
| æå€§ã®åŒ·ã¿ | ã¹ããŒãæšè«ïŒInferenceïŒãšã¹ããŒãé²åïŒEvolutionïŒã®èªåå |
| ãã§ãã¯ãã€ã³ã | ã©ããŸã§èªã¿èŸŒãã ããèšé²ãã忢ããŠãåéå¯èœïŒExactly-onceïŒ |
cloudFiles=Auto Loader?
çµè«ããèšããšããæŠå¿µïŒæ©èœåïŒãšããŠã¯ Auto Loaderãã§ããããã³ãŒãäžã®æå®ïŒèå¥åïŒãšããŠã¯ cloudFilesãã§ãã
TRANSFORMãšexpect actualã§ãã¹ãããå Žåã®å ·äœäŸ
| ã¡ãªãã | å 容 |
|---|---|
| å¯èªæ§ | df = func3(func2(func1(df))) ãšãããå
¥ãåããé¿ããçŽåã«æžãã |
| ãã¹ãã®ãããã | 颿°ãç¬ç«ããŠãããããå°ããªãActualããŒã¿ããå ¥ããŠæåŸ éããæ€èšŒã§ãã |
| DLTãšã®çžæ§ | DLTã®å®çŸ©å ã§ãããã®é¢æ°ãåŒã³åºãã ãã§ãããžãã¯ãã¹ãããªãã |
| ã«ãã»ã«å | è€éãªããžãã¹ããžãã¯ã颿°ã®äžã«é ããã¡ã€ã³åŠçã®èŠèªæ§ãä¿ãŠã |
å ·äœçãªã³ãŒãäŸ
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
# 1. 倿ããžãã¯ãç¬ç«ãã颿°ãšããŠå®çŸ©ïŒåå©çšã»ãã¹ãå¯èœïŒ
def add_tax_transform(df: DataFrame) -> DataFrame:
return df.withColumn("total_price", F.col("price") * 1.1)
def filter_invalid_orders(df: DataFrame) -> DataFrame:
return df.filter(F.col("price") > 0)
# 2. å®éã®ãžã§ããDLTã§ã®äœ¿çšäŸïŒã¡ãœãããã§ãŒã³ïŒ
# transform() ã䜿ãããšã§ãäžããäžãžæµããããã«æžãã
df_result = (
spark.read.table("bronze_orders")
.transform(filter_invalid_orders)
.transform(add_tax_transform)
)
# --- ãã¹ãã³ãŒãã®ã€ã¡ãŒãž ---
# 1. ãã¹ãçšããŒã¿ã®äœæ (Actual)
input_df = spark.createDataFrame([(100,), (200,)], ["price"])
# 2. ããžãã¯ã®é©çš
actual_df = input_df.transform(add_tax_transform)
# 3. æåŸ
ããããŒã¿ã®äœæ (Expected)
expected_df = spark.createDataFrame([(100, 110.0), (200, 220.0)], ["price", "total_price"])
# 4. æ¯èŒïŒäžèŽããã°ãã¹ãåæ ŒïŒ
assert actual_df.collect() == expected_df.collect()
倿§ãªæéã§ãã£ã«ã¿ãŒããå Žåã¯ãæéã¯ISO 8601圢åŒã®æååã§ä¿åã§å åïŒãããšãå€ïŒ
ãæååïŒISO 8601ïŒãã§ã¯ãªããå¿ ããTimestampåïŒå éšçã«ã¯Longå€ïŒãã§ä¿åãã¹ãã§ãã
| æ¯èŒé ç® | æåå (ISO圢åŒ) | Timestampå / Dateå |
|---|---|---|
| ããŒã¿ãµã€ãº | 倧ãã (äŸ: 20æå以äž) | å°ãã (å éšçã« 8ãã€ãçã®æ°å€) |
| ãã£ã«ã¿ãŒæ§èœ | é ãïŒæå忝èŒã«ãªãïŒ | éåžžã«éãïŒæ°å€æ¯èŒã§æé©åãããïŒ |
| ããŒã¿ã¹ããã | å¹ãã«ããïŒèŸæžé ã«ãªãïŒ | 匷åã«å¹ãïŒMin/Maxçµ±èšãæ£ç¢ºïŒ |
| 颿°ã®å©çš | to_timestamp() çã®å€æãå¿
èŠ |
çŽæ¥ year(), window() çã䜿ãã |
| æšæºå | ã¿ã€ã ãŸãŒã³ã®æ±ãã«ãã¹ãåºããã | UTCçã®æšæºåœ¢åŒã§å³å¯ã«ç®¡çå¯èœ |
ãªããæ°å€ïŒTimestampåïŒãã®æ¹ããã£ã«ã¿ãŒã«åŒ·ãã®ãïŒ
1. Delta Lake ã®ãData Skippingã
Delta Lake ã¯ããã¡ã€ã«ã®åã«ã©ã ã® MinïŒæå°å€ïŒ ãš MaxïŒæå€§å€ïŒ ãçµ±èšæ å ±ãšããŠä¿æããŠããŸãã
- Timestampåã®å Žå: æ°å€ãšããŠç®¡çãããŠããããã
WHERE time > '2026-03-01'ãšããã¯ãšãªã«å¯Ÿãããšã³ãžã³ãããã®ãã¡ã€ã«ã®æå€§å€ã¯ 2026-02-28 ã ããèªãå¿ èŠãªãããšç¬æã«å€æïŒã¹ãããïŒã§ããŸãã - æååã®å Žå: æååãšããŠã® Min/Max æ¯èŒã«ãªããããåã«æ¯ã¹ããšãªãŒããŒãããã倧ãããè€éãªæéèšç®ïŒã1æéåããªã©ïŒãå«ããã£ã«ã¿ãŒã®æé©åãå°é£ã«ãªããŸãã
2. ããŒãã£ã·ã§ãã³ã°ãš Z-Order
æéããã£ã«ã¿ãŒã®äž»è»žã«ããå ŽåãDate åã«å€æããŠããŒãã£ã·ã§ãã³ã°ããããTimestamp åã§ Z-Order ã Liquid Clustering ãé©çšãããããŸãã
ãããã¯æ°å€çãªé åºã«åºã¥ããŠããŒã¿ãç©ççã«äžŠã¹æ¿ãããããåãäžèŽããŠããªããšååãªæ§èœãçºæ®ã§ããŸããã
Timestampåãå¹ççã«ããã€å€æ§ãªæ¡ä»¶ã§ãã£ã«ã¿ãŒããéã®SQLäŸ
-- ç¹å®ã®æéãæå® SELECT * FROM orders WHERE order_timestamp BETWEEN '2026-01-01 00:00:00' AND '2026-01-31 23:59:59'; -- åçŽãªæ¯èŒæŒç®å SELECT * FROM orders WHERE order_timestamp >= '2026-03-01';
Shuffle Partitionãšã¯äœïŒ
| é ç® | å 容 |
|---|---|
| çºçã¿ã€ãã³ã° | Wide TransformationïŒJoin, GroupBy, Distinctãªã©ïŒã®å®è¡æ |
| ããã©ã«ãå€ | 200 (ãªãŒãã³ãœãŒã¹Sparkã®å Žå) |
| èšå®ãã©ã¡ãŒã¿ | spark.sql.shuffle.partitions |
| åœ¹å² | ã·ã£ããã«åŸã®ã¿ã¹ã¯ã®äžŠååºŠãæ±ºå®ãã |
| åœ±é¿ | å°ãªããããšã¡ã¢ãªäžè¶³(OOM)ãå€ããããšãªãŒããŒãããå¢ |
ãªããã·ã£ããã«ããå¿ èŠãªã®ãïŒ
äŸãã°ãåããŒãã«ãã©ãã©ã«æ£ãã°ã£ãŠããã売äžããŒã¿ãããå°åããšãã«éèšãããå Žåãåãå°åã®ããŒã¿ãäžã€ã®å Žæã«éããå¿ èŠããããŸãããã® ãããŒã¿ããããã¯ãŒã¯è¶ãã«ç§»åãããŠåé 眮ããããã»ã¹ããã·ã£ããã«ã§ãã
ãã®ãšããç§»åããããŒã¿ãããã€ã®å¡ïŒããŒãã£ã·ã§ã³ïŒã«åããããæ±ºããã®ã Shuffle Partition ã§ãã
Liquid Clusteringã£ãŠããŒããŒã·ã§ã³ããŒãšãœãŒãããŒã¯éäžã§ãããã§ãå€ãããïŒ
ã¯ãããã€ã§ããäœåºŠã§ã倿Žå¯èœã§ãã
Delta Sharingã«é¢ããopen shareæ©èœã£ãŠéçDeltaããŒãã«ã®ã¿å ±æå¯èœïŒããŒãããã¯ãªã©ã¯ïŒ
Open Sharingã¯databrickså€ãžã®å ±æãè¡ããã®
| ã¢ã»ããã®çš®é¡ | Open Sharing (å€éš) | Databricks-to-Databricks |
|---|---|---|
| Delta ããŒãã« | â å ±æå¯èœ | â å ±æå¯èœ |
| ãã¥ãŒ (View) | â å ±æå¯èœ (Unity Catalogçµç±) | â å ±æå¯èœ |
| ããªã¥ãŒã (Volumes) | â å ±æå¯èœ (éå®åœ¢ãã¡ã€ã«) | â å ±æå¯èœ |
| ããŒããã㯠| Ã å ±æäžå¯ | â å ±æå¯èœ |
| ã¢ãã« (MLflow) | Ã å ±æäžå¯ | â å ±æå¯èœ |
Open Sharingã®æé
- ã¢ã¯ãã£ããŒã·ã§ã³ãªã³ã¯ã®éä»:
ããŒã¿æäŸè ãå ±æèšå®ããããšãäžåéãã®ãã¢ã¯ãã£ããŒã·ã§ã³ãªã³ã¯ïŒURLïŒããçºè¡ãããŸãããããçžæã«ã¡ãŒã«çã§éããŸãã - è³æ Œæ
å ±ãã¡ã€ã«ïŒJSONïŒã®ããŠã³ããŒã:
çžæããã®ãªã³ã¯ãã¯ãªãã¯ãããšãè³æ Œæ å ±ãã¡ã€ã«ïŒconfig.shareïŒãšããJSONãã¡ã€ã«ãããŠã³ããŒãã§ããŸãã- 泚æ: ãã®ãªã³ã¯ã«ã¯æå¹æéããããäžåºŠããããŠã³ããŒãã§ããŸããã
- ãªãŒãã³ãœãŒã¹ã¯ã©ã€ã¢ã³ãã§ã®æ¥ç¶:
çžæã¯ãPythonãPandasãPower BIãApache SparkãTableauãªã©ã®Delta Sharing察å¿ã¯ã©ã€ã¢ã³ãã«ãã®JSONãã¡ã€ã«ãèªã¿èŸŒãŸããããšã§ãããããèªåã®æå ã«ããŒã¿ããããã®ããã«ã¯ãšãªãå®è¡ã§ããŸãã
ããŒã¿ãä¿åããå ŽåãUCã®ãã¹ã¯é¢æ°ã§ã¯ãä¿åãããããŒã¿ã¯ãã¹ã¯ãããªãåé¡ãããïŒ
ã¯ãããã®éãã§ãã
Unity CatalogïŒUCïŒã®ãã«ã©ã ãã¹ã¯ïŒMasking FunctionsïŒãã¯ããèªã¿åãæïŒã¯ãšãªå®è¡æïŒãã«åçã«ããŒã¿ãé ãæ©èœã§ãããã¹ãã¬ãŒãžïŒS3ãADLSäžã®Deltaãã¡ã€ã«ïŒã«ä¿åãããŠããç©çããŒã¿ãã®ãã®ãæžãæããããã§ã¯ãããŸããã
ãããªã¢ã©ã€ãºããã¥ãŒãšã¯ã©ããªãã®ïŒ
Materialized Viewã¯ãç°¡åã«èšããšãã¯ãšãªã®çµæããããããèšç®ããŠãç©ççã«ä¿åããŠããããŒãã«ãã®ããšã§ãã
| ç¹åŸŽ | éåžžã®ãã¥ãŒ (View) | ãããªã¢ã©ã€ãºããã¥ãŒ (MV) | éåžžã®ããŒãã« (Table) |
|---|---|---|---|
| å®äœããŒã¿ | ãªãïŒå®çŸ©ã®ã¿ïŒ | ããïŒç©çä¿åïŒ | ããïŒç©çä¿åïŒ |
| èªã¿åãé床 | é ãïŒæ¯åèšç®ïŒ | éãïŒèšç®æžã¿ïŒ | éã |
| ææ°æ§ | åžžã«ææ° | ãªãã¬ãã·ã¥ãå¿ èŠ | æåæŽæ° |
| èšç®ã³ã¹ã | èªã¿åãã®ãã³ã«çºç | ãªãã¬ãã·ã¥æã«çºç | æžãèŸŒã¿æã«çºç |
ã³ãŒãæ¯èŒäŸ
éåžžã®ãã¥ãŒ
-- å®çŸ©ããã ãïŒããŒã¿ã¯ä¿åãããªãïŒ CREATE VIEW main.default.high_value_orders_view AS SELECT order_id, customer_id, amount FROM main.default.orders WHERE amount > 1000;
ãããªã¢ã©ã€ãºããã¥ãŒ
-- ããŒã¿ãèšç®ããŠç©çä¿åãã CREATE MATERIALIZED VIEW main.default.high_value_orders_mv AS SELECT order_id, customer_id, amount FROM main.default.orders WHERE amount > 1000; -- ææ°ç¶æ ã«ããã«ã¯ããªãã¬ãã·ã¥ããå¿ èŠ REFRESH MATERIALIZED VIEW main.default.high_value_orders_mv;
DLTç éåžžã®ãã¥ãŒ
import dlt
@dlt.view
def temporary_filter_view():
# ãã€ãã©ã€ã³å
ã§ã®ã¿æå¹ãªäžæçãªãã¥ãŒ
# ã«ã¿ãã°ã«ã¯ä¿åãããªã
return dlt.read("raw_data").filter("id IS NOT NULL")
DLTç ãããªã¢ã©ã€ãºããã¥ãŒ
import dlt
@dlt.table(
name="sales_summary_mv",
comment="éèšçµæãä¿æãããããªã¢ã©ã€ãºããã¥ãŒ"
)
def sales_summary():
# dlt.read() ã䜿ãããšã§ãäžæµã®å
šããŒã¿ãèªã¿èŸŒãã§éèšãã
# ãã®çµæãç©çããŒãã«ãšããŠä¿åãã
return (
dlt.read("cleaned_sales")
.groupBy("region")
.sum("amount")
)
å ·äœçãªæåã®ã€ã¡ãŒãž
äŸãã°ãæ°åä»¶ã®ã泚æãã°ããããæ¥å¥ã®å£²äžéèšããåºãå ŽåïŒ
- äœæ:
CREATE MATERIALIZED VIEW daily_sales AS SELECT date, sum(amount) FROM orders GROUP BY date; - ç©çä¿å: Databricks ã¯ãã®éèšçµæã Delta ããŒãã«ãšããŠä¿åããŸãã
- åç
§: ãŠãŒã¶ãŒã
SELECT * FROM daily_salesãšæã€ãšãæ°åä»¶ã®èšç®ãé£ã°ããŠãéèšåŸã®æ°åè¡ãèªã¿èŸŒãã ãã«ãªããŸãã - ãªãã¬ãã·ã¥: æ°ããæ³šæããŒã¿ãå
¥ã£ãŠãããã
REFRESH MATERIALIZED VIEW daily_salesãå®è¡ïŒãŸãã¯èªåã¹ã±ãžã¥ãŒã«ïŒããŠäžèº«ãææ°ã«ããŸãã
詊éšã»å®åã§ã®ãã€ã³ã
- ãã〠MV ã䜿ãã¹ããïŒã:
- ããŒã¹ããŒãã«ã巚倧ã
- 倿åŠçïŒTransformïŒãéãã
- ã¯ãšãªã®é »åºŠãé«ãïŒBIããã·ã¥ããŒããªã©ïŒã
- ãææ°æ§ãã®åŠ¥åœæ§:
- MV ã¯ãªãã¬ãã·ã¥ããããŸã§ããŒã¿ãå€ããªãå¯èœæ§ããããŸããç§åäœã®ãªã¢ã«ã¿ã€ã æ§ãå¿ èŠãªå Žåã¯ãéåžžã®ãã¥ãŒãã¹ããªãŒãã³ã°ããŒãã«ãæ€èšããŸãã
- Unity Catalog ã§ã®ç®¡ç:
- æè¿ã® Databricks ã§ã¯ãUnity Catalog äžã§äœæãã MV ãæšå¥šãããŠããŸããããã«ãããä»ã® BI ããŒã«ãããé«é㪠MV ã«ã¢ã¯ã»ã¹ã§ããããã«ãªããŸãã
ãããªã¢ã©ã€ãºããã¥ãŒã¯è€éãªè²¡ååŠç颿°ãªã©ãšçžæ§ããïŒ
ãéåžžã«çžæ§ãè¯ããã§ãã
è€éãªè²¡ååŠçïŒå€è²šæç®ãè€éãªååŽèšç®ãçšé¡ç®åºãªã©ïŒã¯èšç®ã³ã¹ããé«ãããã€ã³ãŒãã®ä¿å®æ§ãæ±ããããããããããªã¢ã©ã€ãºããã¥ãŒïŒMVïŒã掻çšããã¡ãªãããæå€§åãããŸãã
èšç®ã³ã¹ãã®ãäºåæãããã§ãããã
åã®ãã¹ã¯ãããŠãŒã¶ãŒã°ã«ãŒãããšã«ãã£ãŠUDFã䜿ã£ãŠå®çŸããå Žåãã©ããªå®è£ ã«ãªãïŒ
- ãã¹ã¯çšé¢æ°ïŒSQL UDFïŒã®äœæ
CREATE OR REPLACE FUNCTION email_mask(email STRING)
RETURN CASE
-- HRïŒäººäºïŒã°ã«ãŒããªããã®ãŸãŸèŠãã
WHEN is_account_group_member('hr_users') THEN email
-- äžè¬ãŠãŒã¶ãŒãªããã¡ã€ã³ä»¥å€ãé ãïŒäŸ: a***@example.comïŒ
WHEN is_account_group_member('standard_users') THEN regexp_replace(email, '^.*(?=@)', '***')
-- ãã以å€ïŒå€éšã®äººãªã©ïŒã¯å®å
šã«é ã
ELSE 'REDACTED'
END;
- ããŒãã«ã®ã«ã©ã ã«ãã¹ã¯ãé©çšãã
-- ããŒãã«äœææã«æå®ããå Žå CREATE TABLE customers ( user_id INT, email STRING MASK email_mask, -- ããã§é¢æ°ããã€ã³ã full_name STRING ); -- æ¢åã®ããŒãã«ã«åŸããé©çšããå Žå ALTER TABLE customers ALTER COLUMN email SET MASK email_mask;
Catalogã¬ãã«ã§UCã§æš©éèšå®ããŠãå ŽåãçŸç¶ããããŒãã«ãã以éäœãé äžã®ã¹ããŒããªã©ã®æš©éã¯ã©ããªãïŒã«ã¹ã±ãŒãïŒç¶æ¿ãããïŒïŒ
ã¯ããUnity CatalogïŒUCïŒã®æš©éã¯ãã«ã¹ã±ãŒãïŒç¶æ¿ïŒããããŸãã
1. ã¡ã¿ã¹ã㢠(Metastore)
âââ 2. ã«ã¿ãã° (Catalog) â ããã§æš©éãæ¯ããš...
âââ 3. ã¹ããŒã (Schema) â èªåã§ç¶æ¿
âââ 4. ããŒãã« / ãã¥ãŒ / ããªã¥ãŒã â èªåã§ç¶æ¿
3. 泚æç¹ïŒç¶æ¿ã®ãäžæžããããæåŠãã¯ã§ããïŒ
å®åã詊éšã§æ··ä¹±ãããããã€ã³ããããã€ããããŸãã
- DENY ã¯ç¶æ¿ãããåªå ãããŸããäŸãã°ãã«ã¿ãã°ã¬ãã«ã§ SELECT æš©éããã£ãŠããç¹å®ã®ã¹ããŒãã«å¯Ÿã㊠DENY SELECT ãèšå®ããã°ããã®ãŠãŒã¶ãŒã¯ãã®ã¹ããŒããèŠãããšãã§ããªããªããŸãã
- ãäŸå€çã«äžéšã®ã¹ããŒãã ãé ããã¯é£ãã:
ã«ã¿ãã°ã¬ãã«ã§SELECTãäžããŠããŸããšããã®ã«ã¿ãã°å ã®ç¹å®ã®ã¹ããŒãã ããèŠããªãããã«ããããšã¯ã§ããŸããã- 察ç: æš©éãçµãããå Žåã¯ãã«ã¿ãã°ã¬ãã«ã§ã¯ãªããåå¥ã®ã¹ããŒãã¬ãã«ã§æš©éãä»äžããã®ããã¹ããã©ã¯ãã£ã¹ã§ãã
- OwnershipïŒæææš©ïŒã®ç¶æ¿:
æææš©ïŒOWNERïŒã¯ç¶æ¿ãããŸãããã«ã¿ãã°ã®ææè ã§ãã£ãŠããä»äººãäœã£ãã¹ããŒãã®ææè ã«ã¯ãªããŸããããã«ã¿ãã°ã¬ãã«ã®åŒ·åãªæš©éïŒCREATEãªã©ïŒã«ãã£ãŠç®¡çã¯å¯èœã§ãã
Auto loaderã§ãimageããbinaryãtextã£ãŠæ¬ãã¯ããïŒ
Auto LoaderïŒcloudFilesïŒã«ãããŠããimageãããbinaryããšãã£ãå°çšã®ãã©ãŒãããæå®ïŒæ¬ãïŒã¯ãããŸããã
Auto LoaderãçŽæ¥ãµããŒãããŠããããã¡ã€ã«åœ¢åŒãã¯ããããŸã§æ§é åã»åæ§é åããŒã¿ïŒJSON, CSV, ParquetçïŒãäžå¿ã§ããããããç»åããã€ããªãã¡ã€ã«ãæ±ãããå Žåã«ã¯ãbinaryFile ãšãã圢åŒãçµã¿åãããŠäœ¿ãã®ãäžè¬çã§ãã
SCD Type 1 vs. Type 2ã£ãŠãªã«
ããŒã¿ãšã³ãžãã¢ãªã³ã°ã®äžçã§é¿ããŠã¯éããªãã®ãããã® SCDïŒSlowly Changing DimensionïŒãã£ããå€åããæ¬¡å ïŒ ã§ãã
äžèšã§èšããšãããã¹ã¿ããŒã¿ã®å€ãå€ãã£ããšããéå»ã®å±¥æŽãã©ãæ±ããïŒã ãšããæŠç¥ã®éãã§ãã
SCD Type 1ïŒäžæžãïŒå±¥æŽãæ®ããªãïŒ
ãéå»ãªããŠæ¯ãè¿ããªããã¹ã¿ã€ã«ã§ããæ¢åã®ã¬ã³ãŒããæ°ããå€ã§ãã®ãŸãŸäžæžãããŸãã
- æå:
UPDATEåŠçã®ã¿ã - çšé: ååã®ã¿ã€ãä¿®æ£ããéå»ã®å€ãä¿æããå¿ èŠããŸã£ãããªãé ç®ïŒäŸïŒEmailã¢ãã¬ã¹ã®å€æŽãªã©ïŒã
- ã¡ãªãã: ããŒã¿éãæå°éã§æžã¿ãã¯ãšãªãéåžžã«ã·ã³ãã«ã
- ãã¡ãªãã: ã1ã¶æåã¯ã©ãã ã£ããïŒããšããåæãäžå¯èœã«ãªãã
SCD Type 2ïŒå±¥æŽä¿æïŒæ°ããè¡ã远å ïŒ
ãéå»ã倧åã«ãããã¹ã¿ã€ã«ã§ããå€ãå€ãããã³ã«æ°ããè¡ã远å ããã©ã®ããŒã¿ãããã€ãããã€ãŸã§æå¹ã ã£ãããã管çããŸãã
- æå: æ¢åã®è¡ããå€ãããšããæ°ããè¡ã
INSERTããã - 管çã«ã©ã : éåžžã
start_dateïŒæå¹éå§æ¥ïŒãend_dateïŒæå¹çµäºæ¥ïŒãis_currentïŒææ°ãã©ã°ïŒã远å ããŸãã - çšé: äœæïŒå°åå¥ã®å£²äžåæã«å¿ èŠïŒã圹è·ã補åäŸ¡æ Œãªã©ãæéã®çµéã«äŒŽãå€åã远ãããé ç®ã
- ã¡ãªãã: éå»ã®ä»»æã®æç¹ã®ç¶æ ãæ£ç¢ºã«åçŸã§ããã
- ãã¡ãªãã: ããŒã¿éãè¥å€§åããçµåïŒJoinïŒãã¯ãšãªãè€éã«ãªãã
| æ¯èŒé ç® | SCD Type 1 | SCD Type 2 |
|---|---|---|
| ããŒã¿æäœ | OverwriteïŒäžæžãïŒ | Add a new rowïŒè¡è¿œå ïŒ |
| å±¥æŽã®ä¿æ | ãªã | ããïŒå®å šãªå±¥æŽïŒ |
| ããŒãã«ãµã€ãº | å€åãªã | å¢å€§ãç¶ãã |
| è€éã | äœãïŒåçŽãªUpdateïŒ | é«ãïŒæ¥ä»ç®¡çãå¿ èŠïŒ |
| äž»ãªçšé | ãã¹ã®ä¿®æ£ãéèŠåºŠã®äœãæŽæ° | ç£æ»èŠä»¶ãæç³»ååæããã¬ã³ãèª¿æ» |
CDC ããŒã¿ã£ãŠãªã«
DC (Change Data Capture) ãšã¯ãæ¥æ¬èªã§ã倿ŽããŒã¿ãã£ããã£ããšåŒã°ããããŒã¿ããŒã¹ã«å ããããã远å ïŒInsertïŒããæŽæ°ïŒUpdateïŒããåé€ïŒDeleteïŒãã®å€æŽå±¥æŽã ããæœåºããŠãå¥ã®ã·ã¹ãã ã«åæãããææ³ã®ããšã§ãã
ãããŸã§ã«è©±ãããAuto LoaderãããSCD Type 2ããšéåžžã«é¢é£ãæ·±ãæè¡ã§ãã
ãªã CDC ãå¿ èŠãªã®ãïŒ
äŸãã°ãåºå¹¹ã·ã¹ãã ã® MySQL ã«ããã顧客ããŒãã«ãã Databricks ã«åæããããšããŸãã
- åŸæ¥ã®æ¹æ³ïŒãã«ããŒãïŒ: æ¯æ¥ãæ°åäžä»¶ã®å
šããŒã¿ãåŒã£ãæãã
- â ããŒã¿ããŒã¹ã«è² è·ãããããããããæéããããã
- CDC ã®æ¹æ³: ã仿¥ã®10æã« Aããã®äœæãå€ãã£ãããšãããã°ã ãã远ããããã
- â è² è·ã極ããŠäœããã»ãŒãªã¢ã«ã¿ã€ã ã§åæã§ããã
CDC ããŒã¿ã®æ§é ïŒã€ã¡ãŒãžïŒ
CDC ããŒã«ïŒDebezium ãªã©ïŒãéããšãããŒã¿ã¯é垞以äžã®ãããªãã¡ã¿ããŒã¿ä»ãã®åœ¢åŒãã§å±ããŸãã
| 倿Žå 容 (Op) | 顧客ID | åå | äœæ | æŽæ°æ¥æ |
|---|---|---|---|---|
| I (Insert) | 101 | ç°äž | æ±äº¬ | 09:00 |
| U (Update) | 101 | ç°äž | å€§éª | 10:30 |
| D (Delete) | 105 | äœè€ | NULL | 11:00 |
Databricks ã«ããã CDC ã®åŒ·åãªå³æ¹
CDC ããŒã¿ã¯ãåãªã倿Žã®çŸ åããªã®ã§ããã®ãŸãŸã§ã¯ããŒãã«ãšããŠäœ¿ããŸããïŒç°äžããã®ææ°äœæã¯ïŒãšããåãã«çãã«ããïŒãããã§ã以äžã®æ©èœã䜿ããŸãã
1. APPLY CHANGES INTO (Delta Live Tables)
ãããæåŒ·ã®ããŒã«ã§ããCDC ã§å±ãããI, U, Dãã®ãã©ã°ãèªã¿åããã¿ãŒã²ããã® Delta ããŒãã«ã«å¯Ÿã㊠SCD Type 1 ã Type 2 ãèªåé©çšããŠãããŸãã
2. MERGE æ
SQL ã Python ã§æåã§ CDC ããŒã¿ãåæ ãããå Žåã¯ãMERGE æã䜿ããŸããããããããã Updateããªããã° InsertããšããåŠçã 1 ã€ã®ãã©ã³ã¶ã¯ã·ã§ã³ã§è¡ããŸãã
DatabricksïŒDelta LakeïŒã§SCDãCDCãæ±ãéã®ããçŸå Žã§ãã®ãŸãŸäœ¿ããSQLããæŽçããŸãã
äž»ã«ãæåã§å®è¡ãã MERGE INTO ãšãDelta Live Tables (DLT) ã§èªååãã APPLY CHANGES INTO ã®2ã€ã®ãã¿ãŒã³ããããŸãã
1. æåã§è¡ãïŒMERGE INTO (SCD Type 1)
ãæ¢åã®ããŒã¿ãããã°æŽæ°ããªããã°è¿œå ããšããæãäžè¬çãªæäœã§ãã
MERGE INTO customers AS target
USING customer_updates AS source
ON target.user_id = source.user_id
-- 1. ãããããå ŽåïŒæ°ããå€ã§äžæžãïŒSCD Type 1ïŒ
WHEN MATCHED THEN
UPDATE SET
target.email = source.email,
target.address = source.address,
target.updated_at = source.updated_at
-- 2. ãããããªãã£ãå ŽåïŒæ°èŠäœæ
WHEN NOT MATCHED THEN
INSERT (user_id, name, email, address, updated_at)
VALUES (source.user_id, source.name, source.email, source.address, source.updated_at);
2. DLTã§èªååããïŒAPPLY CHANGES INTO
CDCããŒã¿ãåŠçããå Žåããã®æ§æã䜿ãã®ãDatabricksã®ãã¹ããã©ã¯ãã£ã¹ã§ãã
SCD Type 1 ã®å ŽåïŒäžæžãïŒ
CREATE OR REFRESH STREAMING TABLE target_table; APPLY CHANGES INTO LIVE.target_table FROM STREAM(LIVE.source_cdc_stream) KEYS (user_id) - äž»ã㌠SEQUENCE BY updated_at - é åºä¿èšŒçšã®ã«ã©ã COLUMNS EXCEPT (event_type) - é€å€ããã«ã©ã ãããã°æå® STORED AS SCD TYPE 1; - Type 1ïŒå±¥æŽãªãïŒãæå®
SCD Type 2 ã®å ŽåïŒå±¥æŽä¿æïŒ
ããã ãã§ã__start_at ã __end_at ãšãã£ãå±¥æŽç®¡ççšã®ã«ã©ã ãèªåçæãããŸãã
APPLY CHANGES INTO LIVE.target_table FROM STREAM(LIVE.source_cdc_stream) KEYS (user_id) SEQUENCE BY updated_at STORED AS SCD TYPE 2; -- Type 2ïŒå±¥æŽä¿æïŒãæå®
Change Data Feedãå®éã«äœ¿ãäŸ
Change Data Feed (CDF) ã¯ãDeltaããŒãã«ã«å ããããã倿Žãã®ãã®ïŒã€ã³ãµãŒããæŽæ°åã®å€ãæŽæ°åŸã®å€ãåé€ïŒããããã°ãšããŠæœåºã§ããæ©èœã§ãã
åã«ãææ°ã®ç¶æ ããèŠãã®ã§ã¯ãªãããäœãã©ãå€ãã£ããããšããå±¥æŽããäžæµã®åŠçã«å¹ççã«äŒããããã«äœ¿ãããŸãã
CDF ã䜿ãããã® 3 ã¹ããã
1. ããŒãã«ã§ CDF ãæå¹ã«ãã
- ããŒãã«äœææã«æå¹å CREATE TABLE silver_users ( id INT, name STRING, city STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); - æ¢åã®ããŒãã«ã§æå¹å ALTER TABLE silver_users SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
2. ããŒã¿ã®å€æŽæäœãè¡ã
éåžžéããINSERT ã UPDATE ãè¡ããŸãã
SQL
UPDATE silver_users SET city = 'Tokyo' WHERE id = 101;
3. 倿Žç¹ã ããèªã¿åºã (SQL / Python)
ããã CDF ã®ç骚é ã§ããç¹å®ã®ããŒãžã§ã³éãæé垯ã®ãå·®åããæå®ããŠååŸã§ããŸãã
SQL ã®å Žå:
- ããŒãžã§ã³1ããææ°ãŸã§ã®ã倿Žå±¥æŽããååŸ
SELECT FROM table_changes('silver_users', 1);
- ç¹å®ã®æéç¯å²ã§ååŸ
SELECT FROM table_changes('silver_users', '2026-03-04 10:00:00', '2026-03-04 12:00:00');
ãŠã©ãŒã¿ãŒããŒã¯ãšã¯ã䜿çšäŸ
ãŠã©ãŒã¿ãŒããŒã¯ïŒWatermarkïŒãšã¯ãã¹ããªãŒãã³ã°åŠçã«ãããŠãé ããŠå±ããããŒã¿ãã©ããŸã§åŸ ã€ãããå®çŸ©ããå¢çç·ã®ããšã§ãã
from pyspark.sql import functions as F
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/datapath"))
ãŠã©ãŒã¿ãŒããŒã¯ã®èšå®ïŒevent_timeãåºæºã«ã5åéã®é
å»¶ã蚱容
windowed_counts = (df
.withWatermark("event_time", "5 minutes")
.groupBy(
F.window("event_time", "10 minutes")
)
.count())
ãã®èšå®ã§ã®æå
- 20:00 ã 20:10 ã®ãŠã£ã³ããŠãéèšäžã
- 20:05 ã«çºçããããŒã¿ãã20:09 ã«å±ãã â éèšã«å«ãŸããã
- 20:02 ã«çºçããããŒã¿ãã20:12 ã«å±ãã â 5å以äžã®é ãã ãããŠã©ãŒã¿ãŒããŒã¯ããŸã æŽæ°ãããŠããªããã°å ¥ãå¯èœæ§ãããïŒâ»è©³çްåŸè¿°ïŒã
- ã·ã¹ãã ã 20:15 ã®ããŒã¿ãåŠçãããŠã©ãŒã¿ãŒããŒã¯ã 20:10 ãŸã§é²ãã åŸã20:04 ã®ããŒã¿ãå±ãã â å®å šã«ç¡èŠãããã
ãŠã©ãŒã¿ãŒããŒã¯ãwithå¥ãwindowãªã©ãå¿ èŠãªãã®ãçšããŠã以äžã®ã¢ã©ãŒããçãSQL
æ¡ä»¶
ã»1æ¥äžåã¢ã©ãŒã
ã»ä»¥éã¯ã¢ã©ãŒãæ¡ä»¶ã®æã¯æ¯æ¥ã¢ã©ãŒã
ã»æ¯æ¥äœåãã¢ã©ãŒãã¯åºãªã
ã»æ¡ä»¶ã¯3æ¥é£ç¶å¹³åæ°æž©ã5床以äžã®æ
-- 1. æ¥æ¬¡ã®å¹³åæ°æž©ãç®åºããã¹ããªãŒãã³ã°ããŒãã«ïŒBronze -> SilverïŒ
-- ããã§1æ¥1è¡ã®ããŒã¿ã«éçŽãããŠã©ãŒã¿ãŒããŒã¯ã§å€ãããŒã¿ã管çããŸã
WITH daily_summary AS (
SELECT
window.start AS alert_date,
AVG(temperature) AS avg_daily_temp
FROM STREAM(sensor_data)
-- ããŒã¿ã®é
å»¶ãæå€§1æ¥èš±å®¹
WATERMARK event_time DELAY '1 day'
-- 1æ¥åäœã§ãŠã£ã³ããŠéèšïŒããã§1æ¥1ä»¶ã®ã¬ã³ãŒãã«ãªãïŒ
GROUP BY window(event_time, '1 day')
),
-- 2. 3æ¥éã®ç§»åå¹³åãèšç®ããïŒSilver -> GoldçžåœïŒ
moving_average_stats AS (
SELECT
alert_date,
avg_daily_temp,
-- éå»2è¡ïŒçŸåšè¡ïŒèš3æ¥ïŒã®å¹³åãç®åº
AVG(avg_daily_temp) OVER (
ORDER BY alert_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS three_day_avg,
-- 3æ¥åã®ããŒã¿ãæã£ãŠããããã§ãã¯ããããã®ã«ãŠã³ã
COUNT(avg_daily_temp) OVER (
ORDER BY alert_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS day_count
FROM daily_summary
)
-- 3. æçµçãªã¢ã©ãŒãå€å®
SELECT
alert_date,
three_day_avg,
'ALERT: 3-day consecutive average temperature >= 5.0' AS alert_message
FROM moving_average_stats
WHERE
-- 3æ¥éã®å¹³åã5床以äž
three_day_avg >= 5
-- éçšéå§çŽåŸïŒ1æ¥ç®ã2æ¥ç®ïŒã®èª€æ€ç¥ãé²ãããã3æ¥åæã£ãŠããããšã確èª
AND day_count = 3
Coalesce vs Repartition:ã®éãããŸãCoalesce(1)ãšã¯äœ
Coalesce(1) ã¯ãäžèšã§èšãã°ããã©ãã©ã«æ£ãã°ã£ãããŒã¿ããç¡çãã1ã€ã®ç®±ïŒããŒãã£ã·ã§ã³ïŒã«åçž®ãããæäœã§ãã
ããŒã¿ãšã³ãžãã¢ãªã³ã°ã®å®åã«ãããŠãCoalesce(1) ãç»å Žããã·ãŒã³ã¯ã»ãŒäžã€ïŒãæçµçµæã1ã€ã®ãã¡ã€ã«ãšããŠåºåãããæãã§ãã
1. Coalesce(1) ãšã¯äœãããŠããã®ãïŒ
Apache Sparkã«ãããŠãããŒã¿ã¯éåžžè€æ°ã®ã¯ãŒã«ãŒããŒãã«åæ£ããŠä¿æãããŠããŸããCoalesce(1) ãå®è¡ãããšãSparkã¯ãããã¯ãŒã¯çµç±ã§å
šããŒã¿ã1ã€ã®ããŒãã«éãã1ã€ã®ããŒãã£ã·ã§ã³ã«ãŸãšããŸãã
æå€§ã®ç¹åŸŽïŒã·ã£ããã«ããæå°éãã«æãã
Repartition ãšã®æ±ºå®çãªéãã¯ããã«ã·ã£ããã«ïŒå
šããŒã¿ã®ç¡å·®å¥ãªåé
眮ïŒãé¿ããç¹ã§ããCoalesce ã¯æ¢åã®ããŒãã£ã·ã§ã³ããçµåãããã ããªã®ã§ãããŒã¿ã®ç§»åãæå°éã«æããããšããŸãã
| ç¹åŸŽ | Coalesce | Repartition |
|---|---|---|
| ããŒãã£ã·ã§ã³æ° | æžããããšããã§ããªã | å¢ããããšãæžããããšãå¯èœ |
| ã·ã£ããã« | åºæ¬çºçããªãïŒæå°éïŒ | å¿ ãçºçããïŒãã«ã·ã£ããã«ïŒ |
| ããŒã¿ã®åäžæ§ | åãïŒã¹ãã¥ãŒïŒãåºããã | åçã«åé 眮ããã |
| äž»ãªçšé | åŠçã®æåŸã«ãã¡ã€ã«æ°ãæžãã | 䞊å床ãäžããŠåŠçãé«éåãã |
| ã³ã¹ã | äœãïŒå¹ççïŒ | é«ãïŒãããã¯ãŒã¯è² è·å€§ïŒ |
3. ãªã Coalesce(1) ã䜿ãã®ãïŒïŒå®åäŸïŒ
äžçªå€ãã®ã¯ãå€éšã·ã¹ãã ïŒExcelã§èªã¿èŸŒãCSVããç¹å®ã®ããŒã«ãèªã¿èŸŒãèšå®ãã¡ã€ã«ãªã©ïŒãã1ã€ã®ãã¡ã€ã«ããåãä»ããªããå Žåã§ãã
# äœçŸåããã¡ã€ã«ãã§ããã®ãé²ããresult.csv 1ã€ã ããäœã
df.coalesce(1).write.format("csv").save("/path/output")
Wheel Fileããã€ã³ã¿ãŒãããçµç±ãªãã§æŽ»çšãããå Žåãã©ããªæãã§ã€ã³ã¹ããŒã«ããã®ãæãŸããïŒ
ã€ã³ã¿ãŒãããã«æ¥ç¶ã§ããªãç°å¢ïŒãšã¢ã®ã£ããç°å¢ãå³ãããããã·å¶éäžïŒã§Pythonã® Wheelãã¡ã€ã« (.whl) ãæŽ»çšããå ŽåãDatabricksã§ã¯ ãUnity Catalogã®Volumesã ãŸã㯠ãWorkspace Filesã ã«ãã¡ã€ã«ãé 眮ããŠã€ã³ã¹ããŒã«ããã®ãçŸä»£çã§æãã¹ããŒããªæ¹æ³ã§ãã
ç°å¢ãçšéã«åãããŠãæãŸãã3ã€ã®ã¢ãããŒãã解説ããŸãã
1. Unity Catalog Volumes ã䜿ã (æšå¥šïŒæãã»ãã¥ã¢ã§è¿ä»£ç)
Unity Catalogãæå¹ãªç°å¢ã§ããã°ãVolumesïŒããªã¥ãŒã ïŒãã瀟å éå®ã®ã¢ãŒãã£ãã¡ã¯ã眮ãå ŽããšããŠäœ¿ãã®ããã¹ãã§ãã
- æé :
- UIãŸãã¯CLIã§ãããŒã«ã«ã®
.whlãã¡ã€ã«ã Volumes ã®ãã¹ïŒäŸ:/Volumes/main/default/my_lib_vol/ïŒã«ã¢ããããŒãããŸãã - ããŒãããã¯ã®å
é ã§ä»¥äžãå®è¡ããŸããPython
%pip install /Volumes/main/default/my_lib_vol/my_package-0.1-py3-none-any.whl
- UIãŸãã¯CLIã§ãããŒã«ã«ã®
-
ã¡ãªãã: æš©é管çïŒGRANTïŒãå¹ããããç¹å®ã®ããŒã ã ãã«ã©ã€ãã©ãªã®äœ¿çšãèš±å¯ã§ããŸãã
è€éãªè²¡ååŠçUDF à Pandas on Spark ã®äžæãäœ¿ãæ¹
åã°ã«ãŒãïŒäŸïŒåœå¥ãé貚å¥ïŒããšã«è€éãªèšç®ãè¡ãå ŽåãapplyInPandas (Grouped Map UDF) ã䜿ãã®ããã¹ãã§ãã
- çç±: éåžžã®UDFã¯1è¡ãã€åŠçããããé
ãã§ããã
applyInPandasã¯ã°ã«ãŒãåäœã§ããŒã¿ã Pandas DataFrame ãšããŠãŸãšã㊠Python ããã»ã¹ã«æž¡ãããã¯ãã«æŒç®ïŒäžæ¬èšç®ïŒãè¡ããŸãã - ã¡ãªãã: 財åã©ã€ãã©ãªïŒnumpyãæ¢åã®Pythonããžãã¯ïŒããã®ãŸãŸäœ¿ãã€ã€ãSparkã§äžŠååŠçãã§ããŸãã
ã¯ãšãªã®å®è¡æéã¯ã©ãã§èŠãã¹ãïŒ
- Query Profile (SQLãã©ãŠã¶): ãã©ã®æŒç®ïŒJoinãFilterïŒã«æéãããã£ããããã€ã³ããã¯ã¹ïŒData SkippingïŒãå¹ããŠãããããèŠèŠçã«èŠãå Žæã§ããSQLãã¥ãŒãã³ã°ãªãããäžæã§ãã
- ãžã§ãã®å±¥æŽç»é¢ / Spark UI: ãç¹å®ã®ã¿ã¹ã¯ãåã£ãŠããªããïŒã¹ãã¥ãŒïŒãããããã¯ãŒã¯è»¢éïŒã·ã£ããã«ïŒãéããªããããªã©ãã€ã³ãã©åŽã®æåã確èªããã®ã«é©ããŠããŸãã
Shuffle Hash Join ãšã¯
çµåããäž¡æ¹ã®ããŒãã«ããçµåããŒã«åºã¥ããŠå šããŒãã«ã·ã£ããã«ïŒåé 眮ïŒããåããŒãã§ããã·ã¥ããŒãã«ãäœæããŠçµåããææ³ã§ãã
- 䜿ãæ: Shuffle Hash Joinã¯ããSort Merge Joinãå®è¡ããã«ã¯åããŒãã£ã·ã§ã³å
ã®ãœãŒãè² è·ãé«ãããå Žåãããã€ãçæ¹ã®ããŒãã«ã®åããŒãã£ã·ã§ã³ããExecutorã®ã¡ã¢ãªã«ããã·ã¥ããŒãã«ãšããŠåãŸããµã€ãºã§ããå Žåãã«æå¹ã§ãã
äžè¬çã«ãéåžžã«å·šå€§ãªããŒãã«å士ã®çµå㯠Sort Merge Join ãæãå®å®ããŸãã
ããŒã¿ã®ã¹ãã¥ãŒåé¿ç
- maxPartitionBytes ã®èª¿æŽ: 1ããŒãã£ã·ã§ã³ãããã®ãµã€ãºãå°ããããŠãã¿ã¹ã¯ããã现ãã忣ãããã®ã¯æå¹ã§ãã
- ã€ã³ã¹ã¿ã³ã¹åŒ·å: ã¡ã¢ãªãå¢ãããš OOMïŒã¡ã¢ãªäžè¶³ïŒã¯é²ããŸãããã¹ãã¥ãŒïŒ1ã€ã®ã³ã¢ã ãé 匵ã£ãŠããç¶æ ïŒã®æ ¹æ¬è§£æ±ºã«ã¯ãªããŸããã
- Salting (ãœã«ãã£ã³ã°): çµå±ããããæãæ ¹æ¬çãªè§£æ±ºçã§ãã
ã¿ã°ãã€ããã³ãã³ã
ããŒãã«ãã«ã©ã ã«ã¿ã°ãä»ããã®ã¯ ALTER TABLE ã³ãã³ãïŒSQLïŒã§è¡ããŸãã
ALTER TABLE table_name SET TAGS ('department' = 'finance', 'priority' = 'high');
â» sql.conf ã¯ã»ãã·ã§ã³ããšã®èšå®å€æŽã«äœ¿ããã®ã§ãã¡ã¿ããŒã¿ïŒã¿ã°ïŒã®ä»äžã«ã¯äœ¿ããŸããã
AIãã«ã©ã ã®èª¬æãèªåçæããæ©èœã¯ããïŒ
ãããŸãã Unity CatalogïŒUCïŒã®æ©èœãšããŠããAI-generated documentationããæäŸãããŠããŸãã
ã«ã¿ãã°ãšã¯ã¹ãããŒã©ãŒäžã§ãGenerateããã¿ã³ãæŒããšãããŒãã«åãããŒã¿ã®äžèº«ãAIãæšè«ããã«ã©ã ã®èª¬æïŒDescriptionïŒãèªåã§äžæžãããŠãããŸãã
Databricksã®AI-generated documentationã«ã€ããŠã¯ãããŒãã«ã«ã«ã©ã ã远å ãããïŒ
ããŒãã«ã«æ°ããã«ã©ã èªäœãç©ççã«è¿œå ãããããšã¯ãããŸããã
AIãçæããããã¹ãã¯ãUnity Catalogäžã®ä»¥äžã®ãã£ãŒã«ãã«æžã蟌ãŸããŸãã
- Table Comment: ããŒãã«å šäœã®æŠèŠã
- Column Comment: åã«ã©ã ã®æå³ãå 容ã®èª¬æã
Delta ã¯å€éšããŒãã«ã®ææ°æ å ±ãä¿èšŒããïŒ
- Managed TableïŒç®¡çããŒãã«ïŒ: Databricksãå®å šã«ç®¡çãããããåžžã«ææ°ã§ãã
- External TableïŒå€éšããŒãã«ïŒ: Delta圢åŒã§ããã°ãä»ã®ãšã³ãžã³ã§æžã蟌ãŸããŠã
VACUUMãOPTIMIZEã®æŽåæ§ã¯ä¿ãããŸãããéDeltaïŒParquetçïŒã®å€éšãã¡ã€ã«ãçŽæ¥åç §ããŠããå Žåã¯ãREFRESH TABLEãå®è¡ãããŸã§æ°ãã远å ããããã¡ã€ã«ãèŠããªãããšããããŸãã
Standard ãš Dedicated ã¿ãããªèšå®ã¯ããïŒ
ãªããæ£ããã¯ä»¥äž
- Serverless: ãªãœãŒã¹ç®¡çãäžèŠã§ãããã«èµ·åããïŒæšå¥šïŒã
- Pro: ã¯ãŒã¯ãããŒãªã©ã«é©ããæšæºçãªæ§æã
- Classic: æãåºæ¬çãªæ§æïŒå€ãæ©èœïŒã
ãŸã@dltã£ãŠã³ãã³ãã¯äœãæå³ããïŒ
Databricksã®ããŒã¿ãšã³ãžãã¢ãªã³ã°ã«ãããŠã@dltïŒãŸãã¯Pythonã®ãã³ã¬ãŒã¿ãšããŠã® @dlt.table / @dlt.viewïŒã¯ãDelta Live Tables (DLT) ãšãããã€ãã©ã€ã³å°çšã®ãã¬ãŒã ã¯ãŒã¯ã§äœ¿ããã宣èšçãªä¿®é£Ÿåãæå³ããŸãã
ç°¡åã«èšããšããããããæžã颿°ïŒãŸãã¯SQLæïŒã¯ããã ã®ã¯ãšãªã§ã¯ãªããDLTãã€ãã©ã€ã³ã®äžéšãšããŠç®¡çãããããŒãã«ã ãããšã·ã¹ãã ã«æããåå³ã§ãã
@dlt ãæã€äž»ãªåœ¹å²
DLTãã€ãã©ã€ã³ãå®è¡ããéããã®ãã³ã¬ãŒã¿ãããããšã§ä»¥äžã®åŠçãèªååãããŸãã
- äŸåé¢ä¿ã®èªå解決 (DAGã®æ§ç¯):
ã©ã®ããŒãã«ãã©ã®ããŒãã«ãåç §ããŠããããè§£æããæ£ããé çªã§ããŒã¿ãåŠçãããå®è¡èšç»ïŒã°ã©ãïŒããèªåã§äœã£ãŠãããŸãã - ã¹ããŒãã®èªå管ç:
ããŒã¿ã®æ§é ãå€ãã£ãŠããã¿ãŒã²ãããšãªãDelta Tableã®ã¹ããŒããèªåçã«é©çšã»æŽæ°ããŸãã - ãã§ãã¯ãã€ã³ããšãªãã©ã€:
åŠçãéäžã§æ¢ãŸã£ãŠããã©ããŸã§é²ãã ããèšé²ããŠãããããæåããããçŽãå¿ èŠããããŸããã
import dlt
@dlt.table(
name="raw_player_logs",
comment="ã²ãŒã ã®çãã°ããŒã¿"
)
def raw_player_logs():
return spark.readStream.format("cloudFiles").load("/path/to/logs")
CREATE OR REFRESH LIVE TABLE raw_player_logs
AS SELECT * FROM read_files("/path/to/logs")
CREATE OR REFRESH LIVE TABLE raw_player_logsã£ãŠãªã«
SQLã«ããã CREATE OR REFRESH LIVE TABLE ã¯ãDelta Live Tables (DLT) ãšãããã¬ãŒã ã¯ãŒã¯ç¹æã®æ§æã§ãã
éåžžã®SQLïŒSpark SQLïŒã® CREATE TABLE ãšã¯æ±ºå®çã«éããã宣èšåïŒDeclarativeïŒããšããèãæ¹ãåæ ãããŠããŸããäžèšã§ãããšãããã®ããŒãã«ãåžžã«ææ°ã®ç¶æ
ã«ä¿ãŠïŒæé ã¯ãä»»ãããïŒããšããåœä»€ã§ãã
ããããã®åèªã®æå³ãè§£åããŠã¿ãŸãããã
1. åããŒã¯ãŒãã®åœ¹å²
- CREATE OR REFRESH
ãããŒãã«ããªããã°äœãããã§ã«ãããªããäžèº«ãææ°ã®ããŒã¿ã«æŽæ°ïŒãªãã¬ãã·ã¥ïŒããããšããæå³ã§ããæåã§INSERTãUPDATEãæžãå¿ èŠããªããªããŸãã - LIVE
ãããæãéèŠã§ãããã®ããŒãã«ã DLTãã€ãã©ã€ã³ã®äžéš ã§ããããšã瀺ããŸããéåžžã®ããŒãã«ãšã¯éããèåŸã§ãäŸåé¢ä¿ã®ç®¡çãããèªåãªãã©ã€ããè¡ãããç¹å¥ãªååšã«ãªããŸãã - TABLE
çµæãç©çç㪠Delta Table ãšããŠä¿åããŸãïŒå¯Ÿç §çã«ãäžæçãªèšç®çµæãªãLIVE VIEWã䜿ããŸãïŒã
2. ãªãããã䜿ãã®ãïŒïŒããŒã¿ãšã³ãžãã¢ã®ã¡ãªããïŒ
ããŒã¿ãšã³ãžãã¢ãšããŠã®å®åïŒDatabrickså©çšæïŒã§ã¯ã以äžã®ãããªé¢åãªäœæ¥ããã¹ãŠèªååãããŸãã
- å¢åæŽæ°ã®èªåå: ãœãŒã¹ããŒã¿ã«æ°ãããã°ïŒã²ãŒã ã®ãã¬ã€èšé²ãªã©ïŒã远å ããããšããååã®ç¶ãããèªåã§èªã¿èŸŒãã§ãããŸãã
- äŸåé¢ä¿ã®è§£æ±º: äŸãã°ãGoldããŒãã«ãããSilverããŒãã«ããåç §ããŠããå ŽåãSilverãæŽæ°ãããåŸã«Goldãåããããšãã£ãå®è¡é åºãDLTãåæã«å€æããŸãã
- ã¹ããŒãã®é²å: ã²ãŒã ã®ã¢ããããŒãã§ãã°ã®ã«ã©ã ãå¢ããå Žåãããªãã·ã§ã³èšå®ïŒSchema EvolutionïŒæ¬¡ç¬¬ã§èªå察å¿ã§ããŸãã
spark.readStream.format(âcloudFilesâ)ã«ã€ããŠcloudFiles以å€ã®ãªãã·ã§ã³ã¯ïŒ
| ãªãã·ã§ã³ | èªã¿åã察象 | äž»ãªã¡ãªãã | ãã¡ãªãã |
|---|---|---|---|
| cloudFiles | ã¯ã©ãŠãäžã®ãã¡ã€ã« | ã¹ããŒãæšè«ãã¹ã±ãŒã©ããªã㣠| Databricksç¹æã®æ©èœ |
| delta | DeltaããŒãã« | 極ããŠé«éãæ£ç¢ºãª1åéãã®åŠç | å ããŒã¿ãDelta圢åŒã§ããå¿ èŠãã |
| parquet / csv | æšæºãã¡ã€ã« | æ±çšæ§ãé«ã | ãã¡ã€ã«å¢å€§æã«ããã©ãŒãã³ã¹ãå£å |
| kafka / kinesis | ã€ãã³ãã¹ããªãŒã | è¶ äœé å»¶ | ããŒã¿ã®ä¿ææéïŒãªãã³ã·ã§ã³ïŒã«å¶éãã |
WINDOW颿°ãš.windowã¯å¥ç©ïŒ
ããããããF.window()ãšãããŸãå¥ã®ãã®ãããã
1. SQL ã® WINDOW 颿°
æšæºçãªSQLã§ãç¹å®ã®ç¯å²ïŒããŒãã£ã·ã§ã³ïŒã«å¯ŸããŠèšç®ãè¡ãããã®æ§æã§ãã
- èšè¿°å Žæ:
SELECTæã®äžã - ç¹åŸŽ:
OVER (PARTITION BY ... ORDER BY ...)ãšããå¥ã䜿ããŸãã
SELECT user_id, score, -- ãããSQLã®WINDOW颿° AVG(score) OVER (PARTITION BY user_id ORDER BY date) as rolling_avg FROM game_logs
2. DataFrame API ã® .window() ïŒPySparkïŒ
Pythonã®ã³ãŒãå
ã§ãSQLã® OVER å¥ãšåããèšç®ã®æ çµã¿ããå®çŸ©ããããã®ã¡ãœããã§ãã
- èšè¿°å Žæ: Python ã³ãŒãïŒ
pyspark.sql.Windowã¢ãžã¥ãŒã«ïŒã - ç¹åŸŽ: ãŸããçªã®å®çŸ©ããäœããããã
over()ã¡ãœããã«æž¡ããŸãã
from pyspark.sql import Window
import pyspark.sql.functions as F
# 1. çªïŒWindowSpecïŒãå®çŸ©ãã
w = Window.partitionBy("user_id").orderBy("date")
# 2. 颿°ã«é©çšãã
df.withColumn("rolling_avg", F.avg("score").over(w))
| æ¯èŒé ç® | SQL (WINDOW颿°) | PySpark (.window) |
|---|---|---|
| å¯èªæ§ | SQLã«æ £ããŠããã°çŽæçã | 倿°ãšããŠå®çŸ©ã§ããã®ã§ãåå©çšããããã |
| åçãªæäœ | æååæäœãå¿ èŠã | Pythonã®å€æ°ãšããŠæ¡ä»¶åå²ãªã©ã§æ±ããããã |
| æ»ãå€ã®å | ã«ã©ã ã®å€ãã®ãã®ã | WindowSpec ãªããžã§ã¯ãïŒå®çŸ©æ
å ±ïŒã |
| é ç® | Window ãªããžã§ã¯ã (WINDOW颿°) | F.window() 颿° (Time Windowing) |
|---|---|---|
| äž»ãªçšé | è¡ãç¶æãããŸãŸãåšå²ã®çµ±èšå€ãåºã | ããŒã¿ãäžå®æéããšã«ãéçŽããã |
| ã»ããã§äœ¿ãå¥ | .over(window_spec) |
groupBy(F.window(...)) |
| çµæã®è¡æ° | å€ãããªã (å ã®ããŒã¿ãšåãè¡æ°) | æžã (æéæ ããšã«1è¡ã«ãŸãšãŸã) |
| æå®æ¹æ³ | partitionBy, orderBy, rows/rangeBetween |
windowColumn, windowDuration, slideDuration |
| äž»ãªäŸ | çŽè¿3ã¬ã³ãŒãã®ç§»åå¹³åãã©ã³ãã³ã°ä» | 5åããšã®ãã°ã€ã³æ°éèšã1æéããšã®å£²äž |
ã¯ããæéãããã®ã°ã«ãŒãã³ã°ã«é¢ããŠã¯ãF.window() ã®æ¹ãå§åçã«äŸ¿å©ã§åŒ·åã§ãã
F.window()ã®å ·äœçãªã€ã¡ãŒãž
import pyspark.sql.functions as F
# 10åééã§ããŒã¿ãéçŽãã
df_aggregated = df.groupBy(
F.window(F.col("event_time"), "10 minutes")
).agg(
F.count("user_id").alias("login_count")
)
ã·ããªãªïŒã²ãŒã ãã°ã®ã¯ã¬ã³ãžã³ã°
以äžã®3ã€ã®åŠçãè¡ããããšããŸãã
- ãã¹ããŠãŒã¶ãŒã®é€å€ïŒ
filter_test_usersïŒ - ã¹ã³ã¢ã®æ£èŠåïŒ
normalize_scoreïŒ - äžæ£ãªç§»åé床ã®ãã©ã°ç«ãŠïŒ
add_cheater_flagïŒ
sparkã®transformããããšäŸ¿å©ãªäŸ
1. .transform() ã䜿ããªãå ŽåïŒå€æ°å°çïŒ
äžé倿°ãããããå¢ããŠããŸããã³ãŒããèªã¿ã¥ãããªããŸãã
# 颿°å®çŸ©
def filter_test_users(df):
return df.filter(df.user_id != "test_user")
def normalize_score(df):
return df.withColumn("score_norm", df.score / 100)
def add_cheater_flag(df):
return df.withColumn("is_cheater", df.speed > 500)
# å®è¡ïŒå€æ°ã ããã«ãªãïŒ
df1 = filter_test_users(raw_df)
df2 = normalize_score(df1)
final_df = add_cheater_flag(df2)
æ¬ ç¹:
df1,df2ãšãã£ãäœ¿ãæšãŠã®å€æ°ã䞊ã³ãåŠçã®é çªãå ¥ãæ¿ããããäžéšãã³ã¡ã³ãã¢ãŠããããããã®ãé¢åã§ãã
2. .transform() ã䜿ãå ŽåïŒã¹ãããªïŒïŒ
ã¡ãœãããã§ãŒã³ã§äžããäžã«æµããããã«èšè¿°ã§ããŸãã
# å®è¡
final_df = (raw_df
.transform(filter_test_users)
.transform(normalize_score)
.transform(add_cheater_flag)
)
dlt.read_streamã®äŸ¿å©ã
| é ç® | æšæº Spark (spark.readStream) | DLT (dlt.read_stream) |
|---|---|---|
| ãã§ãã¯ãã€ã³ã管ç | æåããã¹ãæå®ããŠç®¡çãå¿ èŠã | èªåãDLTãè£ã§åæã«ç®¡çã |
| ã¹ããŒãæšè«/é²å | å®çŸ©ã峿 Œãé²åã«ã¯èšå®ãå¿ èŠã | æè»ãèªåçã«æ°ããåãæ€ç¥å¯èœã |
| äŸåé¢ä¿ã®è§£æ±º | æåãå®è¡é åºãå¶åŸ¡ããå¿ èŠãããã | èªåãããŒãã«éã®äŸåãã°ã©ãã§è§£æ±ºã |
| ãªãã©ã€ã»å埩 | èªåã§åèµ·åããžãã¯ãæžãå¿ èŠããã | èªåããã€ãã©ã€ã³ãèªå埩æ§ã |
| å¯èŠå | UIäžã§ããŒã¿ãããŒã¯èŠããªãã | ãªããŒãžïŒç³»èïŒãGUIã§èŠããã |
- å ·äœçãªã³ãŒããšæéã®éã
# æšæºSparkã§ã®å
žåçãªæžãæ¹
df = (spark.readStream
.format("cloudFiles") # Auto Loader
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoints/schema")
.load("/input/path"))
(df.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoints/data") # æ¯åæå®ãå¿
èŠ
.trigger(availableNow=True)
.table("target_table"))
import dlt
@dlt.table
def target_table():
return dlt.read_stream("/input/path") # ããã ãã§äŸåé¢ä¿ãšç®¡çãå®äº
3. dlt.read_stream ãå§åçã«äŸ¿å©ãª3ã€ã®çç±
â ããã§ãã¯ãã€ã³ãå°çãããã®è§£æŸ
æšæºSparkã§ã¯ãããŒãã«ããšã« checkpointLocation ã管çããªããã°ãªããŸããããããééãããšããŒã¿ãéè€ãããå£ãããããŸãããDLTã¯ãã€ãã©ã€ã³ã®èšå®ãšããŠäžæ¬ç®¡çããããããåå¥ã®ã³ãŒãã«æžãå¿
èŠããããŸããã
â¡ ãªããŒãžïŒããŒã¿ãããŒïŒã®èªåçæ
dlt.read_stream ã䜿ããšãDatabricksãèªåçã«ããŒãã«éã®ç¹ãããè§£æããçŸããã°ã©ããæããŠãããŸãã
ãã©ã®ããŒãã«ãæ¢ãŸã£ãŠãããããã©ãã§ã¬ã³ãŒããæžã£ããããGUIã§äžç®çç¶ã«ãªãã®ã¯ãéçºäžã®ã²ãŒã ã®ãããã°ã§ãã©ã®ãã©ã°ãåå ã§ãã°ãèµ·ãããããäžç¬ã§ç¹å®ã§ããæèŠã«äŒŒãŠããŸãã
⢠ããŒã¿å質管çïŒExpectationsïŒãšã®çµ±å
ååã®è³ªåã«ãã£ã expect ãªã©ã®å¶çŽã¯ãdlt.read_stream ãšã»ããã§äœ¿ãããšã§ç䟡ãçºæ®ããŸããã¹ããªãŒãã³ã°ã§æµããŠããããŒã¿ã«å¯ŸããŠãããã¡ãªããŒã¿ã¯èœãšãããšããåŠçã1è¡æ·»ããã ãã§å®è£
ã§ããã®ã¯DLTãªãã§ã¯ã®ç¹æš©ã§ãã
Liquid ClusteringãããŒãã«ã«é©çšããã³ãã³ã
1. ããŒãã«ãæ°èŠäœæããå Žå
CLUSTER BY å¥ã䜿çšããŸãã
SQL
CREATE TABLE my_catalog.my_schema.game_logs (
event_id STRING,
user_id STRING,
event_time TIMESTAMP,
event_type STRING
)
CLUSTER BY (user_id, event_type); -- ããã§ã«ã©ã ãæå®
PySpark (DataFrame API)
(df.write
.format("delta")
.clusterBy("user_id", "event_type") # ã¯ã©ã¹ã¿ãŒã«ã©ã ãæå®
.saveAsTable("my_catalog.my_schema.game_logs"))
ãããªã¢ã©ã€ãºããã¥ãŒãäœæããã³ãã³ã
import dlt
from pyspark.sql import functions as F
@dlt.table(
name="stage_stats_mv",
comment="ã¹ããŒãžããšã®éèšããŒã¿ïŒãããªã¢ã©ã€ãºããã¥ãŒïŒ"
)
def stage_stats_mv():
return (
dlt.read("raw_game_logs")
.groupBy("stage_id")
.agg(
F.count("user_id").alias("play_count"),
F.avg("clear_time").alias("avg_time")
)
)
è€éãªè²¡ååŠçããããªã¢ã©ã€ãºããã¥ãŒãçšããŠãèšç®ãå¹çåãããå Žåã®å ·äœçãªã³ãŒãäŸ
import dlt
from pyspark.sql import functions as F
# 1. é貚æç®ãšçšèšç®ãè¡ãäžéMVïŒSilverå±€ïŒ
@dlt.table(
name="financial_transactions_cleaned",
comment="é貚æç®ãšçšèšç®æžã¿ã®æçްããŒã¿"
)
def transactions_cleaned():
return (
dlt.read("raw_sales")
.join(dlt.read("exchange_rates"), "currency_code")
.withColumn("amount_jpy", F.col("amount") * F.col("exchange_rate"))
.withColumn("tax_amount", F.col("amount_jpy") * 0.10) # ç°¡æçãª10%çšèšç®
.withColumn("net_amount", F.col("amount_jpy") + F.col("tax_amount"))
)
# 2. çµå¶ããã·ã¥ããŒãçšã®ææ¬¡éèšMVïŒGoldå±€ïŒ
# ããããèšç®ãå¹çåãããæ¬çªãè€éãªGROUP BYãäºåã«æžãŸããŠå®äœåããã
@dlt.table(
name="monthly_financial_summary_mv",
comment="ææ¬¡ã»éšçœ²å¥ã®è²¡åãµããªãŒïŒãããªã¢ã©ã€ãºããã¥ãŒïŒ",
table_properties={"quality": "gold"}
)
def monthly_summary():
return (
dlt.read("financial_transactions_cleaned")
.groupBy(
F.window(F.col("transaction_timestamp"), "1 month").alias("month"),
"department_id",
"region"
)
.agg(
F.sum("amount_jpy").alias("total_sales_jpy"),
F.sum("tax_amount").alias("total_tax_jpy"),
F.sum("net_amount").alias("total_net_profit"),
F.count("transaction_id").alias("transaction_count")
)
)
ãã®æ§æã®ã¡ãªããïŒãªãå¹çåãããã®ãïŒ
- åèšç®ã®åé¿
BIããŒã«ïŒTableauãDatabricks SQLïŒããåç §ããéãæ¯åãå€è²šæç®ããã1ã¶æåã®éçŽããè¡ãå¿ èŠããããŸããããã§ã«èšç®æžã¿ã®monthly_financial_summary_mvãèªã¿èŸŒãã ããªã®ã§ãå¿çé床ãããªç§åäœã«ãªããŸãã - äŸåé¢ä¿ã®èªå管ç
dlt.read()ã䜿ã£ãŠãããããå ã®raw_salesïŒçããŒã¿ïŒãexchange_ratesïŒçºæ¿ã¬ãŒãïŒãæŽæ°ããããšãDLTãã€ãã©ã€ã³ãèªåçã«ãã®MVãåèšç®ïŒãªãã¬ãã·ã¥ïŒããŠãããŸãã - èšç®ã®å
±éå
çšèšç®ããžãã¯ãªã©ãè€æ°ã®éšçœ²ã§å ±æããå Žåã1ã€ã®MVã§èšç®ããŠããã°ãåéšçœ²ã®ã¯ãšãªã§ããžãã¯ã埮åŠã«ããããã¹ãé²ããŸãã
Shallow Clone vs Deep Clone
| ç¹åŸŽ | Shallow Clone (æµ ãã³ããŒ) | Deep Clone (æ·±ãã³ããŒ) |
|---|---|---|
| ããŒã¿å®äœ | ã³ããŒããªãïŒã¡ã¿ããŒã¿ã®ã¿ïŒ | å šããŒã¿ãã³ããŒãã |
| äŸåæ§ | å ã®ããŒã¿ã«äŸåãã | å®å šã«ç¬ç«ããŠãã |
| ã³ã¹ã/é床 | éåžžã«é«éãã¹ãã¬ãŒãžæ¶è²»ã»ãŒãŒã | æéãããããã¹ãã¬ãŒãžä»£ã2å |
| 詊éšã®çœ | å
ã®ããŒãã«ã§ VACUUM ãå®è¡ãããšãShallow CloneããããŒãã«ã¯åç
§å
ã倱ãå£ããå¯èœæ§ãããã |
ãœãŒã¹ããŒãã«ãåé€ããŠããDeep CloneåŽã«ã¯åœ±é¿ããªãã |
| æšå¥šã·ãŒã³ | çæéã®ãã¹ããéçºç°å¢ã§ã®æ€èšŒ | æ¬çªããŒã¿ã®ããã¯ã¢ããããªãŒãžã§ã³ãè·šãã ç§»è¡ |
Job Parameters ãš Task Values
ãžã§ãïŒã¯ãŒã¯ãããŒïŒå ã§ã®ãå€ã®åãæž¡ããã«ã€ããŠã®æ·±ãçè§£ãåãããŸãã
- Job Parameters:
- ãžã§ãå šäœã®éå§æã«æž¡ãã宿°ãã®ãããªãã®ã
dbutils.widgets.get()ãªã©ã§ååŸãããžã§ãå ã®å šã¿ã¹ã¯ã§å ±éããŠå©çšå¯èœã
- Task Values:
- ãã¿ã¹ã¯Aã®çµæãã¿ã¹ã¯Bã§äœ¿ãããã ãšããåçãªåãæž¡ãã«äœ¿çšã
dbutils.jobs.taskValues.set(key="my_key", value=123)ã§ä¿åããåŸç¶ã¿ã¹ã¯ã§getããã- 詊éšãã€ã³ã: ãã¿ã¹ã¯éã§åçãªã¹ããŒã¿ã¹ãèšç®çµæãå ±æããæé©ãªæ¹æ³ã¯ïŒããšèããããããã
äŸ
- Parameter Key: process_date
- Parameter Value: 2026-03-12
# ãã©ã¡ãŒã¿ã®ååŸ
process_date = dbutils.widgets.get("process_date")
# ååŸããæ¥ä»ã䜿ã£ãŠããŒã¿ããã£ã«ã¿ãªã³ã°
df = spark.table("raw_sales").filter(f"order_date = '{process_date}'")
Adaptive Query Execution
Spark 3.0ããã®ç®çæ©èœãå®è¡äžã«ãçµ±èšæ å ±ããèŠãŠãã¯ãšãªãã©ã³ãåçã«æžãæããæ©èœã§ãã
詊éšã§åããããAQEãã§ãã3ã€ã®éæ³ãïŒ
- Coalescing Post-shuffle Partitions: ã·ã£ããã«åŸã«çްãããªããããããŒãã£ã·ã§ã³ãèªåã§çµåãããã¡ã€ã«æ°éå€ãé²ãã
- Switching Join Strategies: å®è¡äžã«ããããæå€ãšãã£ã¡ã®ããŒãã«å°ãããªããšå€æããããSort Merge Joinãã Broadcast Hash Join ã«åæã«åãæ¿ããã
- Optimizing Skew Joins: ããŒã¿ã®åãïŒã¹ãã¥ãŒïŒãæ€ç¥ããŠãéãããŒãã£ã·ã§ã³ã现ããåå²ããŠåŠçãåäžåããã
is_member() 颿°
- å®è£ äŸ
CREATE FUNCTION sales_row_filter(region STRING)
RETURN
is_account_group_member('admin_group') OR region = 'Japan';
- 詊éšãã€ã³ã: ãç¹å®ã®ãããŒãžã£ãŒã«ã¯å
šããŒã¿ããäžè¬ç€Ÿå¡ã«ã¯èªåã®å°åã®ããŒã¿ã ããèŠãããããšããã·ããªãªã§ã
MASKã§ã¯ãªãROW FILTERãšçµã¿åãããŠäœ¿ãã
| çš®é¡ | 該åœããåèª |
|---|---|
| äºçŽèª / ããŒã¯ãŒã | CREATE, FUNCTION, RETURN, OR |
| èå¥åïŒãŠãŒã¶ãŒå®çŸ©ïŒ | sales_row_filter (颿°å), region (åŒæ°å) |
| çµã¿èŸŒã¿é¢æ° | is_account_group_member |
Sort Merge
å€§èŠæš¡ããŒãã«å士ãçµåããéã®ãéåãªãããã©ã«ãã®çµåææ³ã§ãã
- ã¹ããã:
- Shuffle: çµåããŒã«åºã¥ããŠããŒã¿ãå šããŒãã«åé 眮ã
- Sort: åããŒãå ã§ããŒã¿ãããŒé ã«äžŠã¹æ¿ããã
- Merge: 䞊ãã ãã®å士ãäžããã¬ããã£ã³ã³ããŠçµåã
- 詊éšãã€ã³ã: * Broadcast Hash Join ã䜿ããªãïŒäž¡æ¹ãã«ãããïŒæã®æçµææ®µã
- Shuffleã³ã¹ããé«ã ãããå¯èœãªéã
Z-OrderãLiquid Clusteringã§ããŒã¿ãæŽçããŠããããšã§ããã®ã¹ãã£ã³ããœãŒããå¹çåã§ããã spark.sql.shuffle.partitionsã®èšå®å€ãããã®Joinã®ããã©ãŒãã³ã¹ã«çŽçµããã
- Shuffleã³ã¹ããé«ã ãããå¯èœãªéã