èšäºã«ã€ããŠ
æè¿Databricksæ¡ä»¶ãå¢ããŠããã®ã§ã
æ¹ããŠåŠç¿ããŠããäžã§æç¢ºã«ç¥ããªãç¥èãæŽçããŠã¿ãŸããã
æ€å®è©Šéšãªã©ã§äœ¿ããç¥èããããšæãã®ã§ã
èå³ã®ããæ¹ã¯ã掻çšãã ããã
èšäºæŠèŠ
æ¬æã§ã¯ä»¥äžã®ç¹ã«ã€ããŠè§ŠããŸã
ð» éçºç°å¢ãšã€ã³ã¿ãŒãã§ãŒã¹
- ã©ã€ãã©ãªç®¡ç
- %pip ããžãã¯ã³ãã³ãã䜿çšããŠãããŒãããã¯åäœã®ã¹ã³ãŒãã§ã€ã³ã¹ããŒã«ã
- ããŒãããã¯ã®èå¥
- æåã®è¡ã«èšè¿°ãããç¹æ®ãªããããŒïŒ# Databricks notebook sourceã
- ã¹ãã¬ãŒãžã®éã
- DBFS ã«ãŒã: ã¯ãŒã¯ã¹ããŒã¹å°çšã®äžæçãªä¿åå Žæã
- å€éšããŠã³ãïŒMountïŒ: ãŠãŒã¶ãŒã管çãã S3/ADLS ãªã©ã®å€éšã¹ãã¬ãŒãžã
- 察話åããŒã«ïŒãŠã£ãžã§ããïŒ
- text, dropdown, combobox, multiselect ã®4çš®é¡ã§ãã©ã¡ãŒã¿ãŒãåçã«åãåãã
- ãã¥ãŒã®äœæ
- ãšã€ãªã¢ã¹ïŒASïŒãèšå®ããããšã§ãå ã®ããŒã¿æ§é ãç¶æãã€ã€ãå©çšè åãã®åãããããåœåãæäŸã
- ã»ãã¥ãªãã£
- SecretsïŒç§å¯æ å ±ïŒã®åãæ±ãïŒå埩åŠçãªã©ã§äžçšæã«é²åºãããªãéçšã
â¡ ããã©ãŒãã³ã¹ãšã¢ãŒããã¯ãã£
- Spark UIã®æŽ»çš
- ãStagesãããStorageãã¿ããããã£ã¹ã¯ãžã®æžã蟌ã¿ç¶æ³ããã£ãã·ã¥ç¶æ ã確èªã
- åŠçã®å¹çå
- %shïŒåäžããŒãïŒã®éçãçè§£ããå€§èŠæš¡ããŒã¿ã¯Sparkã«ãã忣åŠçãžç§»è¡ããã
- æé©åæè¡ïŒè¿°èªããã·ã¥ããŠã³ïŒPredicate PushdownïŒ
- ç©çãã©ã³ã®æ®µéã§ãã£ã«ã¿ãªã³ã°ãé©çšããèªã¿èŸŒãããŒã¿éãæå°åã
- çµåã®é«éåïŒbroadcast()
- å°ããããŒãã«ãå šããŒãã«é åžããããšã§ãã³ã¹ãã®é«ããã·ã£ããã«ããåé¿ã
- ã¹ãã¥ãŒïŒæªã¿ïŒå¯Ÿç
- ç¹å®ããŒãã£ã·ã§ã³ãžã®ããŒã¿åãïŒData SkewïŒãç¹å®ããåŠçã®ããã«ããã¯ãè§£æ¶ã
- ã¯ã©ã¹ã¿ãŒã¿ã€ãïŒHigh ConcurrencyïŒé«äžŠåïŒã¯ã©ã¹ã¿ãŒ
- è€æ°ãŠãŒã¶ãŒã«ããåæã¯ãšãªå®è¡ã«ç¹åãããªãœãŒã¹ã®åé¢ãšæé©åãè¡ãã
ð ã¹ããªãŒãã³ã°ãšCDCïŒå€æŽããŒã¿ææïŒ
- Structured Streaming
- ç¡éã«å¢ãç¶ããããŒãã«ïŒUnbounded TableïŒãšããŠããŒã¿ãæ±ãããã°ã©ãã³ã°ã¢ãã«ã
- ç¶æ
管çïŒcheckpointLocationïŒ
- ã¯ãšãªã®é²è¡ç¶æ³ãèšé²ãã¹ããŒã倿޿ã«ã¯æ°ãããã¹ãæå®ããå¿ èŠãããã
- ã¹ããªãŒã éçµå
- ã¿ã€ã ã¹ã¿ã³ãçãããŒã«ãäžèŽããçžæ¹ãå±ããŸã§ç¶æ ïŒStateïŒãä¿æã»è¿œè·¡ã
- 倿ŽããŒã¿ãã£ãŒãïŒChange Data Feed: CDFïŒ
- æ¿å ¥ã»æŽæ°ã»åé€ã®ãå·®åããæœåºãTime TravelïŒæé¢ã®åŸ©å ïŒãšã®äœ¿ãåãã
- CDCã®èªååŠç
- å€éšã·ã¹ãã ããã®å€æŽãã°ãæ€ç¥ããAPPLY CHANGES INTO çã§DeltaããŒãã«ãžåæ ã
ðãžã§ã管çãšã¬ããã³ã¹
- å®è¡å±¥æŽã®ä¿æ
- ãžã§ãã®å®è¡çµæã¯ãããã©ã«ãã§ 60æ¥é ãŸã㯠5,000ä»¶ ãŸã§ä¿æã
- API掻çšïŒJobs API
- GET /api/2.1/jobs/get ã䜿çšããŠããã«ãã¿ã¹ã¯ãžã§ãã®è©³çŽ°ãæ§æã確èªã
- ããŒã¿ä¿è·ïŒPIIããŒã¿å¯ŸçïŒ
- ããŒãã£ã·ã§ã³åå²ãšACLïŒã¢ã¯ã»ã¹å¶åŸ¡ïŒãçµã¿åãããç©ççãªå¢çãå©çšããåé€ã»ä¿è·ã®æé©åã
- ããŒã¿å質管ç
- ãã€ãã©ã€ã³å€ã®ç®¡ççšããŒãã«ã§å質ã«ãŒã«ãäžæ¬ä¿æããåçãªããªããŒã·ã§ã³ãå®çŸã
- Delta Live Tables (DLT)
- ãã€ãã©ã€ã³ã®èªååãš pipelines.reset.allowed ããããã£ã«ããåèšç®ã®å¶åŸ¡ã
以äžèšäºæ¬æ
æé©åãããæžã蟌ã¿ãšã¯
- Databricksã®ãæé©åãããæžã蟌ã¿ïŒOptimized WritesïŒãã¯ãDelta LakeããŒãã«ãžã®ããŒã¿æžãèŸŒã¿æã«ã倿°ã®å°ããªãã¡ã€ã«ãçæããããå°ãã¡ã€ã«åé¡ïŒSmall File ProblemïŒããèªåçã«åé¿ããæé©ãªãã¡ã€ã«ãµã€ãºïŒéåžž128MBïŒã«çµ±åããŠæžãèŸŒãæ©èœã§ã
- 倿°ã®å°ããªãã¡ã€ã«ã«ã©ã®ãããªåé¡ãããïŒ
- ã¡ã¿ããŒã¿ç®¡çã®ãªãŒããŒããã
- ãã¡ã€ã«ã·ã¹ãã ïŒS3ãAzure Data Lakeãªã©ïŒã¯ããã¡ã€ã«ãéããã³ã«ãã©ãã«ããŒã¿ãããããã確èªããã¡ã¿ããŒã¿ãžã®ã¢ã¯ã»ã¹ãçºçããŸãã
- 1GBã®ãã¡ã€ã«ã1æïŒ ã¡ã¿ããŒã¿ã®èªã¿åãã¯1åã
- 1KBã®ãã¡ã€ã«ã100äžæïŒ ã¡ã¿ããŒã¿ã®èªã¿åãã100äžåçºçããããã ãã§æ°åãããããšããããŸãã
- I/OïŒå
¥åºåïŒã®éå¹ç
- ãã£ã¹ã¯ããããã¯ãŒã¯ããã®èªã¿èŸŒã¿ã«ã¯ãæºåéåïŒã·ãŒã¯ã¿ã€ã ãæ¥ç¶ç¢ºç«ïŒãå¿ èŠã§ããå°ããªãã¡ã€ã«ã°ããã ãšããããŒã¿ã®èªã¿èŸŒã¿æéããããããã¡ã€ã«ãééããæéãã®æ¹ãé·ããªã£ãŠããŸããŸãã
- 䞊ååŠçã®éç
- ã¡ã¿ããŒã¿ç®¡çã®ãªãŒããŒããã
ããŒããã㯠ã¬ãã«ã®ã¹ã³ãŒããæã€ Python ããã±ãŒãžãã€ã³ã¹ããŒã«ããæ¹æ³
- Databricksã«ãããŠãç¹å®ã®ããŒãããã¯å ã§ã®ã¿æå¹ãªPythonã©ã€ãã©ãªãã€ã³ã¹ããŒã«ããã«ã¯ã%pip ããžãã¯ã³ãã³ãã䜿çšããã®ãæšæºçãã€æãæšå¥šãããæ¹æ³ã§ãã
ããã©ã«ãã®ããŒã¿ä¿æéŸå€ã®æéã¯äœæ¥ïŒ
- 7æ¥é
- Delta Lakeã§å€ããã¡ã€ã«ïŒå逿žã¿ãŸãã¯æŽæ°åã®ããŒã¿ïŒãã¯ãªãŒã³ã¢ãããã VACUUM ã³ãã³ãã«é¢é£ããããã©ã«ãã®ä¿ææé㯠7æ¥é ã§ãã
ããŒãã£ã·ã§ã³ããã£ã¹ã¯ã«æžã蟌ãŸããŠããããšã瀺ãäž»ãªææšãæäŸãã Spark UI ã® 2 ã€ã®å Žæã¯ã©ãïŒ
- Stages ã¿ãïŒããã³ Stage 詳现ããŒãžïŒ
- ç¹å®ã®ã¹ããŒãžãéžæãããšè¡šç€ºããããSummary MetricsãããTasksãã®ããŒãã«ã§ã以äžã®é ç®ã確èªã§ããŸãã
- Storage ã¿ã
- ãã£ãã·ã¥ïŒ.persist() ã .cache()ïŒãããããŒã¿ã»ããã®ç¶æ ã確èªããå Žæã§ãã
- Storage Level: ããã§ DISK_ONLY ã MEMORY_AND_DISK ãšè¡šç€ºãããŠããå ŽåãããŒã¿ããã£ã¹ã¯ã«ä¿æãããŠããããšã瀺ããŸãã
- Size on Disk: ãã£ãã·ã¥ãããããŒãã£ã·ã§ã³ã®ãã¡ãå®éã«ãã£ã¹ã¯ã«æžã蟌ãŸãã容éã確èªã§ããŸãã
以äžã®ã³ãã³ãã§ãç¹å®ã®ããŒããŒã·ã§ã³ã®ãã£ã¹ã¯æžã蟌ã¿ãåé¡ã§ãªãã確èªã§ããŸã
SELECT
partition_column_name,
COUNT(*) as row_count,
SUM(size_in_bytes) / 1024 / 1024 as size_mb -- ãã¡ã€ã«ãµã€ãºãååŸã§ããã¡ã¿ããŒã¿ãããå Žå
FROM
table_name
GROUP BY
1
ORDER BY
row_count DESC
Databricks Python ããŒãããã¯ã®æåã®è¡ã¯äœã«ãªããŸãã?
# Databricks notebook source
â ãã®è¡ã¯ãDatabricksãããŒãããã¯ããœãŒã¹ãã¡ã€ã«ïŒ.pyãã¡ã€ã«ãªã©ïŒãšããŠãšã¯ã¹ããŒãããããå€éšã®ãªããžããªïŒGitHubãªã©ïŒãšåæãããããéã«ã ããã®ãã¡ã€ã«ã¯Databricksã®ããŒãããã¯åœ¢åŒã§ããã ããšãèå¥ããããã®ç¹æ®ãªããããŒïŒããžãã¯ã³ã¡ã³ãïŒã§ãã
â DBFS ã«ãŒã ã¹ãã¬ãŒãžãšãdbutils.fs.mount() ã䜿çšããŠããŠã³ããããå€éšãªããžã§ã¯ã ã¹ãã¬ãŒãžã®éãã¯ïŒ
â Databricksã«ãããŠãDBFSïŒDatabricks File SystemïŒã«ãŒããš å€éšããŠã³ãïŒdbutils.fs.mount()ïŒ ã¯ãã©ã¡ãã dbfs:/ 圢åŒã®ãã¹ã§ã¢ã¯ã»ã¹ã§ããŸããããã®å®äœãšéçšé¢ã«ã¯å€§ããªéãããããŸãã
| é ç® | DBFS ã«ãŒã ã¹ãã¬ãŒãž | å€éšãªããžã§ã¯ã ã¹ãã¬ãŒãž (Mount) |
|---|---|---|
| å®äœ | Databricks ã¯ãŒã¯ã¹ããŒã¹äœææã«èªåçæãããå°çšã® S3/ADLS ãã±ããã | ãŠãŒã¶ãŒãèªèº«ã§ç®¡çããæ¢åã®ã¯ã©ãŠãã¹ãã¬ãŒãžïŒS3, ADLS, GCSïŒã |
| äž»ãªçšé | ã©ã€ãã©ãªã®ä¿åãåæãµã³ãã«ããŒã¿ãäžæçãªãã°ãHive ã¡ã¿ã¹ãã¢ã®ããã©ã«ãå Žæã | æ¬çªããŒã¿ãå€§èŠæš¡ãªããŒã¿ã¬ã€ã¯ãå ±æã¢ã»ããã |
| ã»ãã¥ãªã㣠| ã¯ãŒã¯ã¹ããŒã¹ã®å šãŠãŒã¶ãŒãèªã¿æžãå¯èœïŒè©³çŽ°ãªæš©é管çãå°é£ïŒã | ã¯ã©ãŠãåŽã®IAMããŒã«ããµãŒãã¹ããªã³ã·ãã«ã§è©³çްãªã¢ã¯ã»ã¹å¶åŸ¡ãå¯èœã |
| ããŒã¿ã®æ°žç¶æ§ | ã¯ãŒã¯ã¹ããŒã¹ãåé€ãããšããã®ã¹ãã¬ãŒãžãåååé€ãããã | ã¯ãŒã¯ã¹ããŒã¹ãåé€ããŠããã¹ãã¬ãŒãžå ã®ããŒã¿ã¯ãã®ãŸãŸæ®ãã |
| æšå¥šåºŠ | äœïŒæ©å¯ããŒã¿ã®ä¿åã¯éæšå¥šïŒã | é«ïŒããŒã¿ã¬ããã³ã¹ã®èгç¹ããæšå¥šïŒã |
%sh䜿ããšçŽ 1 GB ã®ããŒã¿ãæœåºããŠããŒãããã®ã« 20 å以äžãããå¯èœæ§ããïŒã©ãããã°å¯Ÿå¿ã§ããïŒ
- ãªã %sh ã¯é
ãã®ãïŒïŒ20å以äžãããçç±ïŒ
- åäžããŒãã®éç
- %sh ã¯ãã©ã€ããŒããŒãïŒèŠªçã®PC1å°ïŒã ãã§åäœããŸããDatabricks ã®æå€§ã®åŒ·ã¿ã§ãããè€æ°å°ã®åæ£åŠçïŒSparkïŒããäžå䜿ã£ãŠããŸããã
- ã¡ã¢ãªäžè¶³ãšãã£ã¹ã¯I/O
- 1GBã®ããŒã¿ããã©ã€ãã®ããŒã«ã«ã¡ã¢ãªããã£ã¹ã¯ã§åŠçããããšãããšããªãœãŒã¹ãæ¯æžããã¹ã¯ããïŒåŠçã®åæ»ïŒãçºçããŸãã
- ãããã¯ãŒã¯ã®ããã«ããã¯
- ã¯ã©ãŠãã¹ãã¬ãŒãžïŒS3/ADLSïŒãšã®éä¿¡ããæé©åãããŠããªãæšæºçãªã·ã§ã«çµç±ã§è¡ãããã䞊å転éãã§ããéåžžã«æéãããããŸãã
- åäžããŒãã®éç
- æšå¥šããã察å¿çïŒ20åãããåŠçã æ°ç§ãæ°å ã«ççž®ããããã®ã¹ãããã§ãã
- %pip ã§ã©ã€ãã©ãªãå ¥ããSpark/Pythonã§æžã
- ãã %sh ã§ curl ã wget ã䜿ã£ãŠããŒã¿ãååŸããŠãããªããPythonã®ã©ã€ãã©ãªã Spark ãçŽæ¥äœ¿ããŸãããã
- B. dbutils.fs (cp/mv) ãæŽ»çšãã
DBFSãããŠã³ãæžã¿ã¹ãã¬ãŒãžéã§ããŒã¿ãç§»åããããªããã·ã§ã«ã³ãã³ãã® cp ã§ã¯ãªããDatabrickså°çšã®ãŠãŒãã£ãªãã£ã䜿ããŸããããã¯ããã¯ã°ã©ãŠã³ãã§æé©åãããŠããŸãã
Spark Structured Streaming ã§äœ¿çšãããäžè¬çãªããã°ã©ãã³ã° ã¢ãã«ã®ç¹åŸŽã¯ïŒ
-
- ç¡éã«å¢ãç¶ããããŒãã« (Unbounded Table)
- Structured Streamingã§ã¯ãã¹ããªãŒãã³ã°ããŒã¿ããéçãªããŒãã«ããšããŠæ±ããŸãã
- æ°ããããŒã¿ãå°çãããã³ã«ããã®ããŒã¿ã¯ ããŒãã«ã®æ°ããè¡ãšããŠæ«å°Ÿã«è¿œå ïŒAppendïŒ ãããŠãããšããèãæ¹
-
- ã€ã³ã¯ãªã¡ã³ã¿ã«ïŒå¢åïŒå®è¡
Sparkã¯ãå ¥åããŒãã«ã«è¿œå ãããæ°ããããŒã¿ãèªåçã«æ€åºããååã®å®è¡ããã®å·®åïŒå¢åïŒã ããåŠçããŸãã
- ã€ã³ã¯ãªã¡ã³ã¿ã«ïŒå¢åïŒå®è¡
-
- åºåã¢ãŒã (Output Modes)åŠççµæãå€éšïŒã¹ãã¬ãŒãžãã³ã³ãœãŒã«ïŒã«æžãåºãéã以äžã®3ã€ã®ã¢ãŒãããéžæã§ããŸãã
- ã¢ãŒãç¹åŸŽ
â Append (远å )æ°ãã远å ãããè¡ã®ã¿ãåºåãã
â ïŒæãäžè¬çïŒã
â Complete (å®å š)æ¯åãçµæããŒãã«å šäœãæžãåºã
â ïŒéèšåŠçãªã©ã§äœ¿çšïŒã
â Update (æŽæ°)
â ååãã倿Žããã£ãè¡ã®ã¿ãæŽæ°ã»åºåããã
-
- å¹çæ§
- æ¯åããŒã¿å šäœãåèšç®ããã®ã§ã¯ãªããæ°ããå±ããããŒã¿ã®ã¿ãæ¢åã®çµæã«çµåãããéèšããããããããäœã¬ã€ãã³ã·ã§åäœããŸãã
å®è¡äŸ
Structured Streamingã䜿çšããã«ã¯ãspark.readStream ããå§ãŸããæåŸã« writeStream ã§åºåã¢ãŒããæå®ããã®ãåºæ¬ã®æµãã§ãã
# 1. ç¡éã«å¢ãç¶ããããŒãã«ãšããŠèªã¿èŸŒã¿
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/mnt/data/input_logs")
# 2. ã€ã³ã¯ãªã¡ã³ã¿ã«ãªå€æåŠç
# (ããã§ã¯æ°ããè¡ã ããèªåçã«åŠç察象ã«ãªã)
processed_df = df.filter("action = 'click'").groupBy("user_id").count()
# 3. åºåã¢ãŒãã®æå®
query = processed_df.writeStream \
.outputMode("complete") \
.format("delta") \
.option("checkpointLocation", "/mnt/data/checkpoints") \
.start("/mnt/data/output_table")
ãããããªããããåŠçã³ãŒãã®äŸ
# readStream ã§ã¯ãªã read ã䜿ã
df = spark.read \
.format("delta") \
.load("/mnt/data/user_logs") # å®è¡ããç¬éã«ååšããããŒã¿ã ããèªã¿èŸŒã
# åŠçãå®è¡
result = df.filter("event = 'jump'")
# writeStream ã§ã¯ãªã write ã䜿ã
# 1åæžã蟌ãã§çµäºïŒæ°ããããŒã¿ãæ¥ãŠãèªåã§ã¯åããªãïŒ
result.write \
.mode("overwrite") \
.save("/mnt/data/processed_results")
ããŠã³ãããããªããžã§ã¯ã ã¹ãã¬ãŒãžã« Python Wheel ãã¢ããããŒãããã«ã¯ãã©ã® Databricks CLI ã³ãã³ãã䜿çšããå¿ èŠããããŸãã?
- ããŠã³ãããããªããžã§ã¯ãã¹ãã¬ãŒãžïŒDBFSäžã®ããŠã³ããã€ã³ãïŒã« Python Wheel ãã¡ã€ã«ïŒ.whlïŒãã¢ããããŒãããã«ã¯ãDatabricks CLI ã®
databricks fs cpã³ãã³ãã䜿çšããŸãã
ãžã§ãå®è¡å±¥æŽã®ä¿ææéãšã¯äœã§ãã?
- Databricks ã«ããã ãžã§ãå®è¡å±¥æŽã®ä¿ææéïŒJob Run RetentionïŒ ãšã¯ãå®è¡ããããžã§ãã®çµæããã°ãããã³è©³çްãªçµ±èšæ
å ±ãã·ã¹ãã å
ã«ä¿åãããUIãAPIãã確èªã§ããæéã®ããšã§ãã
ãã®æéãéãããšãéå»ã®å®è¡ããŒã¿ã¯èªåçã«åé€ãããŸãã
ããã©ã«ãã®ä¿ææé
- Databricks ã®æšæºèšå®ã§ã¯ããžã§ãã®å®è¡å±¥æŽã¯ä»¥äžã®æ¡ä»¶ã§ä¿æãããŸãã
- æé: 60æ¥é
- ä»¶æ°å¶é: åäžãžã§ãã«ã€ãæå€§ 5,000ä»¶ ãŸã§
PIIããŒã¿ã¯ããã㯠ãã£ãŒã«ãããšã«ããŒãã£ã·ã§ã³åå²ããACL ãšåé€ã¹ããŒãã¡ã³ãã§ããŒãã£ã·ã§ã³å¢çãæŽ»çšã§ããããã«ããå¿ èŠãããïŒ
- ã¯ãããã®èšèšæ¹éã¯éåžžã«çã«ããªã£ãŠãããDatabricksãããŒã¿ã¬ã€ã¯ã®éçšã«ãããŠããã¹ããã©ã¯ãã£ã¹ãã®äžã€ãšèšããŸãã
- äž»ãªçç±ã¯ä»¥äžã®3ç¹ã§ãã
-
- åé€ã¹ããŒãã¡ã³ãïŒGDPR/æ¹æ£å人æ å ±ä¿è·æ³å¯Ÿå¿ïŒã®æé©å
-
- ACLïŒã¢ã¯ã»ã¹å¶åŸ¡ïŒã®ç°¡çŽ å
-
- ããŒãã£ã·ã§ã³ã»ãã«ãŒãã³ã°ã«ããæ§èœåäž
-
ããŒãã«ã®åèšãµã€ãºã 10 TB ãè¶ ããŠããã«ãããããããã»ãšãã©ã®ãã¡ã€ã«ã¯ 64 MB æªæºã§ããçç±ã¯ã©ããªãã®ãããïŒ
- ããŒãã«ã®åèšãµã€ãºã 10 TB ãããã®ã«ãåã ã®ãã¡ã€ã«ã 64 MB æªæºïŒå°ããªãã¡ã€ã«ã倧éã«ããç¶æ ïŒã§ããå ŽåãããŒã¿ãã©ãããã©ãŒã ã®ããã©ãŒãã³ã¹ãèããäœäžããããSmall File ProblemïŒå°ããªãã¡ã€ã«åé¡ïŒããçºçããŠããŸãã
-
- éå°ãªããŒãã£ã·ã§ã³å (Over-partitioning)
æãäžè¬çãªåå ã§ããé«ã«ãŒãã£ããªãã£ïŒå€ã®çš®é¡ãéåžžã«å€ãïŒã«ã©ã ã§ããŒãã£ã·ã§ã³ãåå²ãããšãããŒã¿ã现åãã«ãªããŸãã
- éå°ãªããŒãã£ã·ã§ã³å (Over-partitioning)
- äŸ
- äŸãã° user_id ã timestampïŒç§åäœïŒãªã©ã§ããŒãã£ã·ã§ã³ãåãããšã1ã€ã®ããŒãã£ã·ã§ã³ïŒãã©ã«ãïŒãããã®ããŒã¿éãæ¥µç«¯ã«å°ãªããªããŸãã
倿Žãã£ãŒãæ©èœã®ã¡ãªãããšãã䜿ãããã±ãŒã¹ã¯ïŒ
- DatabricksïŒDelta LakeïŒã® 倿ŽããŒã¿ãã£ãŒãïŒChange Data Feed: CDFïŒ ã¯ãããŒãã«ã«å¯ŸããŠè¡ããããæ¿å
¥ã»æŽæ°ã»åé€ãã®å±¥æŽãã倿Žåã®å€ãšåŸã®å€ãå«ããŠæ£ç¢ºã«è¿œè·¡ã§ããæ©èœã§ãã
åã«ææ°ã®ããŒã¿ãååŸããã ãã§ãªãããäœãã©ãå€ãã£ããããšããå€åã®ããã»ã¹ã掻çšã§ããã®ãæå€§ã®åŒ·ã¿ã§ãã - 倿ŽããŒã¿ãã£ãŒãïŒCDFïŒã®äž»ãªã¡ãªãã
- å¢ååŠçã®å¹çå
- ããŒãã«å šäœãã¹ãã£ã³ãçŽãå¿ èŠããªããååã®åŠç以éã®ã倿Žåãã ãããã³ãã€ã³ãã§ååŸã§ãããããã³ã³ãã¥ãŒãã³ã¹ããå€§å¹ ã«åæžã§ããŸãã
- 倿Žçš®å¥ã®å€å¥
- ãã®ããŒã¿ããæ°èŠè¿œå ïŒInsertïŒããªã®ããæ¢åã®æŽæ°ïŒUpdateïŒããªã®ãããããã¯ãåé€ïŒDeleteïŒããããã®ãããã·ã¹ãã åïŒ_change_typeïŒã«ãã£ãŠäžç®ã§å€å¥ã§ããŸãã
- ããã©ãŒã»ã¢ãã¿ãŒã®ææ¡
- æŽæ°ïŒUpdateïŒã®å ŽåãæŽæ°åã®å€ïŒupdate_preimageïŒãšæŽæ°åŸã®å€ïŒupdate_postimageïŒãåæã«ååŸã§ãããããæ°å€ã®å·®åèšç®ãªã©ã容æã«ãªããŸãã
- å¢ååŠçã®å¹çå
- ãã䜿ãããã±ãŒã¹ïŒãŠãŒã¹ã±ãŒã¹ïŒ
- ã¡ããªãªã³ã»ã¢ãŒããã¯ãã£ã§ã®äžæµãžã®äŒæ
BronzeïŒçããŒã¿ïŒãã SilverïŒã¯ã¬ã³ãžã³ã°æžã¿ïŒãGoldïŒéèšæžã¿ïŒãžãšããŒã¿ãæµãéãSilverããŒãã«ã§çºçãããå€ãããŒã¿ã®åé€ãããéå»ããŒã¿ã®ä¿®æ£ããäžæµã®GoldããŒãã«ã«åæ ãããããã«äœ¿çšãããŸãã - ãããªã¢ã©ã€ãºãã»ãã¥ãŒã®æŽæ°
è€éãªéèšïŒåèšå€ãå¹³åå€ãªã©ïŒãè¡ã£ãŠããããŒãã«ã«ãããŠãäžéšã®ããŒã¿ãå€ãã£ãéã«ããã®å·®åã ãã䜿ã£ãŠéèšçµæããã€ã³ã¯ãªã¡ã³ã¿ã«ã«æŽæ°ãããå Žåã«éåžžã«æå¹ã§ãã - å€éšã·ã¹ãã ãžã®åæïŒCDCïŒ
Databrickså ã®ããŒã¿ãæŽæ°ãããããšãããªã¬ãŒã«ãå€éšã®æ€çŽ¢ãšã³ãžã³ïŒElasticsearchçïŒãåºå¹¹ã·ã¹ãã ãéç¥ãµãŒãã¹ã«ãã©ã®ã¬ã³ãŒããã©ãå€ãã£ããããæ£ç¢ºã«éç¥ã»åæããéã«å©çšãããŸãã - ç£æ»ãšã³ã³ãã©ã€ã¢ã³ã¹
ã誰ãããã€ãã©ã®å€ããäœããäœãžæžãæãããããšãã詳现ãªç£æ»ãã°ãšããŠæŽ»çšã§ããŸããç¹ã«PIIïŒå人æ å ±ïŒã®å€æŽå±¥æŽã远跡ããå¿ èŠãããå Žåã«åŒ·åãªæŠåšãšãªããŸãã
- ã¡ããªãªã³ã»ã¢ãŒããã¯ãã£ã§ã®äžæµãžã®äŒæ
倿ŽããŒã¿ãã£ãŒãã®æå¹åã®ä»æ¹
-- ããŒãã«äœææã«æå¹å CREATE TABLE student_scores (id INT, name STRING, score INT) TBLPROPERTIES (delta.enableChangeDataFeed = true); -- æ¢åã®ããŒãã«ã§æå¹å ALTER TABLE student_scores SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
2. 倿Žå±¥æŽãèªã¿åãæ¹æ³
CDFãæå¹ã«ãªããšãéåžžã®ããŒã¿ã«å ããŠä»¥äžã®4ã€ã®ã¡ã¿ããŒã¿åãèšé²ãããŸãã
â _change_type: 倿Žã®çš®é¡
â insert
â update_preimage
â update_postimage
â delete
â _commit_version
â 倿޿ã®ããŒãžã§ã³çªå·
â _commit_timestamp
â 倿޿ã®ã¿ã€ã ã¹ã¿ã³ã
-- ããŒãžã§ã³0ããææ°ãŸã§ã®å€æŽå±¥æŽãååŸ
SELECT * FROM table_changes('student_scores', 0);
-- ç¹å®ã®æéã®å€æŽãååŸ
SELECT * FROM table_changes('student_scores', '2026-03-01 00:00:00');
table_changes ã¯CDFãæå¹ãªããŒãã«ã«å¯ŸããŠã®ã¿äœ¿ããç¹å¥ãªé¢æ°ã§ãã
ç¯å²æå®ã®ãã¿ãŒã³
â ããŒãžã§ã³æå®
â table_changes(âmy_tableâ, 10, 15)
â ããŒãžã§ã³10ãã15ãŸã§ã®å€æŽã
â ææ°ãŸã§
â table_changes(âmy_tableâ, 10)
â ããŒãžã§ã³10ããçŸåšãŸã§ã®å
šå€æŽã
â æéæå®
â table_changes(âmy_tableâ, â2026-03-01 10:00:00â)
â ãã®æå»ä»¥éã®å€æŽã
DatabricksãŠã£ãžã§ãããšã¯ïŒã©ããªãã®ãããïŒ
- DatabricksãŠã£ãžã§ããã¯ãããŒãããã¯ãã€ã³ã¿ã©ã¯ãã£ããªããã·ã¥ããŒãã«å€ããããã®éåžžã«äŸ¿å©ãªããŒã«ã§ãã
äžèšã§èšãã°ããã³ãŒããçŽæ¥æžãæããã«ãç»é¢äžã®å ¥åãã©ãŒã ãã倿°ïŒãã©ã¡ãŒã¿ãŒïŒãæž¡ãä»çµã¿ãã®ããšã§ããããã䜿ãããšã§ããšã³ãžãã¢ä»¥å€ã®äººã§ãå€ã倿ŽããŠåæçµæãåèšç®ãããããšãå¯èœã«ãªããŸãã
ãŠã£ãžã§ããã§ã§ããããš
- åçãªãã£ã«ã¿ãªã³ã°
- æ¥ä»ãå°åãéžæããŠã°ã©ããæŽæ°ããã
- 倿°ã®å
±éå
- ãã€ãã©ã€ã³å
šäœã§äœ¿ãèšå®å€ïŒãã¡ã€ã«ãã¹ãªã©ïŒãäžç®æã§ç®¡çããã
- ãžã§ãã®åŒæ°åãåã
- å€éšã®ã¯ãŒã¯ãããŒïŒAzure Data Factoryãªã©ïŒãããã©ã¡ãŒã¿ãŒãåãåã£ãŠå®è¡ããã
ãŠã£ãžã§ããã®çš®é¡ïŒå š4çš®ïŒ
| ãŠã£ãžã§ããå | ç¹åŸŽ | é©ããçšé |
|---|---|---|
| ããã¹ã (text) | èªç±ãªæååãå ¥åã§ããããã¯ã¹ã | ååãIDãç¹å®ã®ããŒã¯ãŒãæ€çŽ¢ãªã©ã |
| ããããããŠã³ (dropdown) | ãããããèšå®ãããªã¹ããã1ã€ãéžæã | éœéåºçãéšçœ²åãã¹ããŒã¿ã¹ïŒON/OFFïŒãªã©ã |
| ã³ã³ãããã¯ã¹ (combobox) | ãªã¹ãããéžã¶ããèªç±å ¥åãå¯èœã | åè£ã¯ããããäŸå€çãªå€ãå ¥ãããå Žåã |
| ãã«ãéžæ (multiselect) | ãªã¹ãããè€æ°ã®å€ããã§ãã¯ããã¯ã¹åœ¢åŒã§éžæã | è€æ°å°åã®åæåæãç¹å®ã«ããŽãªã®çµã蟌ã¿ã |
ãŠã£ãžã§ããã®åºæ¬çãªäœ¿ãæ¹ïŒPythonäŸïŒ
ãŠã£ãžã§ãããäœæã»æäœããã«ã¯ã
dbutils.widgets
ã䜿çšããŸãã
ãŠã£ãžã§ããã®äœæ
# ããããããŠã³ãäœæããäŸ
dbutils.widgets.dropdown("region", "Tokyo", ["Tokyo", "Osaka", "Nagoya"], "å°åãéžæããŠãã ãã")
# ããã¹ãããã¯ã¹ãäœæããäŸ
dbutils.widgets.text("user_id", "001", "ãŠãŒã¶ãŒIDãå
¥å")
å€ã®ååŸ
äœæãããŠã£ãžã§ããã«å ¥åãããå€ãã³ãŒãå ã§äœ¿ãã«ã¯ get ã¡ãœããã䜿ããŸãã
selected_region = dbutils.widgets.get("region")
print(f"éžæãããå°åã¯: {selected_region}")
ãŠã£ãžã§ããã®åé€
dbutils.widgets.remove("region") # ç¹å®ã®ãã®ãåé€
dbutils.widgets.removeAll() # å
šãŠåé€
ãã¥ãŒãäœæããã¡ãªããã«ã€ããŠ
æ°ããããŒãã«ããéžæãããã£ãŒã«ãã«ãšã€ãªã¢ã¹ãèšå®ããããšã§ã
å
ã®ããŒã¿ã¹ããŒããšããŒãã«åãç¶æãããã¥ãŒãäœæããã¡ãªããã¯ïŒ
ãç©ççãªããŒã¿æ§é ïŒå ããŒãã«ïŒã¯å€ããã«ãå©çšè ã«èŠããçªå£ïŒãã¥ãŒïŒã ããé ç®åãåããããã ããããç¹å®ã®åã ãã«çµã£ããããŠæäŸãã ã ãšããããšãæããŸãã
å ·äœçã«ã©ãããããšã解説ããŸãã
å ã ã®ããŒãã«ã«ããã«ã©ã åïŒååïŒãã·ã¹ãã çãªååïŒäŸ: cust_id_01ïŒã ã£ãå Žåããã¥ãŒãäœæããéã«äººéãçè§£ããããååïŒäŸ: customer_idïŒ ã«å€æããã±ãŒã¹ã§ãã
SQLã®ã€ã¡ãŒãž:
CREATE VIEW sales_v AS SELECT cust_id_01 AS customer_id, -- ãšã€ãªã¢ã¹ãèšå® sls_amt AS sales_amount -- ãšã€ãªã¢ã¹ãèšå® FROM raw_data.sales_table; -- å ã®ããŒã¿ã¹ããŒããšããŒãã«å
- ãå
ã®ããŒã¿ã¹ããŒããšããŒãã«åãç¶æãããã®æå³
ããã¯ã ãå ã®ããŒã¿ïŒãœãŒã¹ïŒãäžåããããªãã ãšããç¹ãéèŠã§ãã- ããŒã¿ã®å®äœ: å ã®ã¹ããŒãïŒäŸ: raw_dataïŒã«ããå ã®ããŒãã«ïŒäŸ: sales_tableïŒã®äžã«ãå ã®ã«ã©ã åã®ãŸãŸæ®ãç¶ããŸãã
- ãã¥ãŒã®åœ¹å²: ãã¥ãŒã¯ãããŸã§ãå®çŸ©ããªã®ã§ãå ã®ããŒã¿ãã³ããŒããããšãªããåç §ããéã®èŠãæ¹ã ããå€ããŠããŸãã
- ãªããããè¡ãã®ãïŒã¡ãªããïŒ
- ããžãã¹ãã¬ã³ããªãŒãªåœå
- ãšã³ãžãã¢ãäœã£ãè€éãªç©çåã®ããŒãã«ããã¢ããªã¹ãã䜿ãããããæ¥æ¬èªåãããæšæºçãªè±èªåãã«å€ããŠæäŸã§ããŸãã
- ã€ã³ãã¯ãã®æå°å
- ç©çããŒãã«ã®ã«ã©ã åãã·ã¹ãã ã®éœåã§å€ãã£ãŠãããã¥ãŒåŽã§ãšã€ãªã¢ã¹ã調æŽããã°ããã®ãã¥ãŒã䜿ã£ãŠããããã·ã¥ããŒãïŒTableauãPower BIãªã©ïŒãä¿®æ£ããã«æžã¿ãŸãã
- äžèŠãªåã®é èœ
- å ããŒãã«ã«ã¯100åãã£ãŠãããã¥ãŒã§ç¹å®ã®10åã ãã«ãšã€ãªã¢ã¹ãã€ããŠéžæïŒSELECTïŒããã°ãå©çšè ã¯è¿·ããã«æžã¿ãŸãã
- äºææ§ã®ç¶æ
- å€ãã·ã¹ãã ã§äœ¿ã£ãŠããã«ã©ã åããšã€ãªã¢ã¹ãšããŠç¶æããããšã§ãç§»è¡æéäžã®ããŒã«ãå£ããã«æ°ããããŒãã«æ§é ãžç§»è¡ã§ããŸãã
- ããžãã¹ãã¬ã³ããªãŒãªåœå
Delta Lake ã¯ãã¯ãšãª ãã£ã«ã¿ãŒã«åºã¥ããŠããŒã¿ã®ã¹ãããã«æŽ»çšãããåããŒãã«ã®æåã®äœåã®çµ±èšãèªåçã«åéããïŒ
Delta LakeãããŒã¿ã¹ãããïŒData SkippingïŒã®ããã«èªåçã«çµ±èšæ å ±ãåéããã®ã¯ãããŒãã«ã®æåã®32å ã§ãã
32åã®çµ±èšåéã®ä»çµã¿
Delta Lakeã¯ãããŒãã«ã«ããŒã¿ãæžã蟌ãéãåããŒã¿ãã¡ã€ã«ïŒParquetãã¡ã€ã«ïŒã«å«ãŸããå€ã®æå°å€ (minimum)ãæå€§å€ (maximum)ãããã³Nullå€ã®æ°ãèšé²ããŸãã
SQLã®DESCRIBE EXTENDEDã³ãã³ããšã¯
DatabricksïŒããã³Apache Spark SQLïŒã«ãããDESCRIBE EXTENDEDã³ãã³ãã¯ãããŒãã«ã®åºæ¬æ§é ïŒã«ã©ã åãããŒã¿åïŒã«å ããŠããã詳现ãªã¡ã¿ããŒã¿ïŒæ ŒçŽå ŽæãããŒãã«åœ¢åŒãªã©ïŒã衚瀺ããããã®ã³ãã³ã ã§ãã
éåžžã®DESCRIBEïŒãŸãã¯DESCïŒã§ã¯ã«ã©ã æ å ±ãã衚瀺ãããŸããããEXTENDEDãä»ããããšã§ããã®ããŒãã«ãã©ãã«ããã®ãïŒããå€éšããŒãã«ã管çããŒãã«ãïŒããšãã£ãéçšã«äžå¯æ¬ ãªæ å ±ãäžæ°ã«ç¢ºèª ã§ããŸãã
ååŸã§ããäž»ãªæ
å ±
ãã®ã³ãã³ããå®è¡ãããšãéåžžã®ã«ã©ã ãªã¹ãã®äžã«
Detailed Table Information
ãšããã»ã¯ã·ã§ã³ã衚瀺ããã以äžã®é
ç®ã確èªã§ããŸãã
| é ç®å | å 容 |
|---|---|
| Database | ããŒãã«ãæå±ããããŒã¿ããŒã¹ïŒã¹ããŒãïŒåã |
| Table | ããŒãã«åã |
| Owner | ããŒãã«ã®ææè ïŒäœæè ïŒã |
| Created Time | äœææ¥æã |
| Last Access | æçµã¢ã¯ã»ã¹æ¥æã |
| Created By | äœæããSparkã®ããŒãžã§ã³ãªã©ã |
| Type | MANAGEDïŒç®¡çããŒãã«ïŒã EXTERNALïŒå€éšããŒãã«ïŒãã |
| Location | å®éã®ããŒã¿ãã¡ã€ã«ãä¿åãããŠããã¯ã©ãŠãã¹ãã¬ãŒãžïŒS3/ADLS/GCSïŒã®ãã¹ã |
| Provider | ããŒã¿ãã©ãŒãããïŒdelta, parquet, csv ãªã©ïŒã |
| Table Properties | delta.minReaderVersion ã delta.dataSkippingNumIndexedCols ãªã©ã®ã«ã¹ã¿ã èšå®ã |
ä¿åãããç§å¯ãå埩åŠçããŠåæåãåºåããã®å埩åŠçãšã¯ïŒ
å
·äœçãªã€ã¡ãŒãžïŒPythonäŸïŒ
äŸãã°ãç§å¯ã®ãã¹ã¯ãŒãã TopSecret ã ã£ããšããŸããããããå埩åŠçããŠåæåãåºåãããã³ãŒãã¯ä»¥äžã®ããã«ãªããŸãã
# ä¿åãããç§å¯
secret = "TopSecret"
# å埩åŠçïŒforã«ãŒãïŒ
for char in secret:
print(char)
æ§é åã¹ããªãŒãã³ã°ã§ã¯ããªãã¯ãšãªã«å€æŽãå ããéãæ°ããcheckpointLocationãæå®ããå¿ èŠãããïŒâ»æ°ãããã£ãŒã«ãã®è¿œå ãã¹ããŒãã®å€æŽãªã©
æ§é åã¹ããªãŒãã³ã°ïŒStructured StreamingïŒã«ãããŠããã§ãã¯ãã€ã³ãïŒcheckpointLocationïŒã¯ã ãã¯ãšãªã®å®¶èšç°¿ïŒç¶æ 管çïŒã ã®ãããªåœ¹å²ãæãããŠããŸãã
ã¯ãšãªã«ç Žå£çãªå€æŽïŒæ°ããåã®è¿œå ãéèšããžãã¯ã®å€æŽïŒãå ããéã«ããªãæ°ãããã¹ãæå®ããå¿ èŠãããã®ãããã®çç±ã¯ ãäžæŽåïŒççŸïŒããé²ããã ã§ãã
- ãã§ãã¯ãã€ã³ãã®äžèº«
ãã§ãã¯ãã€ã³ããã£ã¬ã¯ããªã«ã¯ãåãªãããŒã¿ã®é²è¡ç¶æ³ïŒãªãã»ããïŒã ãã§ãªãã以äžã®éèŠãªæ å ±ãä¿åãããŠããŸãã- Offset: ã©ããŸã§ããŒã¿ãèªã¿èŸŒãã ãã
- Commit: ã©ã®ããŒã¿ã®æžã蟌ã¿ãå®äºãããã
- Schema: å®è¡æã®ããŒã¿ã®æ§é ã
- State: éèšïŒWindow颿°ãGroup ByïŒã®éäžã®èšç®çµæã
- ãªãåãå Žæã䜿ãåããªãã®ãïŒ
ããã¹ããŒãã倿Žããã®ã«ãå€ããã§ãã¯ãã€ã³ãã䜿ãåãããšãããšãSparkã¯ä»¥äžã®ãããªãããã¯ïŒãšã©ãŒïŒãèµ·ãããŸãã- ã¹ããŒãã®ãã¹ããã
- ãä¿åãããŠããå®¶èšç°¿ã«ã¯ãååããšãéé¡ããããªãã®ã«ãæ°ããã³ãŒãã«ã¯ãçšçããšããåããããã©ãèšç®ãåéããŠãããåãããªãïŒããšãªããŸãã
- ç¶æ
ïŒStateïŒã®éäºææ§
- ååã®éèšçµæïŒStateïŒãã2åæ§æãã§ä¿åãããŠããå Žåãæ°ããã3åæ§æãã®ã³ãŒãã§ãã®ç¶ããèšç®ããããšãããšããã€ããªã¬ãã«ã§ããŒã¿ã®äžæŽåãçºçããŸãã
- ã¹ããŒãã®ãã¹ããã
- ãã§ãã¯ãã€ã³ããæ°ããããã¿ã€ãã³ã°
以äžã®ãããªå€æŽãè¡ãå Žåã¯ãåºæ¬çã« æ°ãã checkpointLocation ãæå®ããïŒãŸãã¯æ¢åã®ãã©ã«ããåé€ããïŒ å¿ èŠããããŸãã
| 倿Žå 容 | çç± |
|---|---|
| åã®è¿œå ã»åé€ | ä¿åãããŠããã¹ããŒãæ å ±ãšäžèŽããªããªãããã |
| éèšããŒã®å€æŽ | å éšã§ä¿æããŠãããç¶æ ïŒStateïŒãã®æ§é ãå€ããããã |
| UDFïŒãŠãŒã¶å®çŸ©é¢æ°ïŒã®å€æŽ | ããžãã¯ãå€ãããšãäžéçµæãšã®æŽåæ§ãåããªããªãããã |
| åºåã¢ãŒãã®å€æŽ | append ãã complete ãžã®å€æŽãªã©ã¯ç®¡çæ¹æ³ãç°ãªãããã |
ããŒãã«ã¹ããŒããæŽæ°ããã«ã¯ã远å ãããã£ãŒã«ãããšã«ããã©ã«ãå€ãæå®ããªãå Žåã©ããªãïŒ
- æ¢åã®è¡ã«ã¯ãNULLããå ¥ã
ã¹ããªãŒãã³ã°ããŒã¿ãã¬ãŒã ãšéçããŒãã«ã®çµåã³ãã³ãäŸã¯ïŒ
éçããŒãã«ãå³åŽã«ããããã«çµåããã®ãäžè¬çã§ã
Databricks High Concurrency ã¯ã©ã¹ã¿ãŒãšã¯ïŒ
Databricksã® High ConcurrencyïŒé«äžŠååŠçïŒã¯ã©ã¹ã¿ãŒãšã¯ãè€æ°ã®ãŠãŒã¶ãŒãåæã«ãªãœãŒã¹ãå ±æããŠã¯ãšãªãå®è¡ããããšã«ç¹åããã¯ã©ã¹ã¿ãŒã¿ã€ãã§ãã
äž»ã«ãããŒã¿ãµã€ãšã³ãã£ã¹ããã¢ããªã¹ããããŒã ã§1ã€ã®ã¯ã©ã¹ã¿ãŒã䜿ããSQLãPythonãRãªã©ã§ã€ã³ã¿ã©ã¯ãã£ãã«åæãè¡ããå ±æç°å¢ããšããŠèšèšãããŠããŸãã
- äž»ãªç¹åŸŽãšã¡ãªãã
High Concurrencyã¯ã©ã¹ã¿ãŒããæšæºïŒStandardïŒãã¯ã©ã¹ã¿ãŒãšç°ãªãç¹ã¯ã以äžã®éãã§ãã
| ç¹åŸŽ | å 容 |
|---|---|
| ãªãœãŒã¹ã®æé©å | ã¯ãšãªããšã«ãªãœãŒã¹ã现ããå²ãåœãŠã1人ã®éãã¯ãšãªãã¯ã©ã¹ã¿ãŒå šäœãå æããã®ãé²ããŸãã |
| ãŠãŒã¶ãŒåé¢ (Isolation) | è€æ°ã®ãŠãŒã¶ãŒãåãã¯ã©ã¹ã¿ãŒã§äœæ¥ããŠãããäºãã®å€æ°ã颿°ãå¹²æžããªãããããã»ã¹ãåé¢ãããŠããŸãã |
| ãªãŒãã¹ã±ãŒãªã³ã° | å®è¡åŸ ã¡ã®ã¯ãšãªãå¢ãããšèªåçã«ã¯ãŒã«ãŒããŒãã远å ããè² è·ãæžããšåé€ããŠã³ã¹ããæããŸãã |
| å€èšèªãµããŒã | SQLãPythonãR ããµããŒãããŠããŸãããScala ã¯ã¢ãŒããã¯ãã£äžã®å¶éã«ãããµããŒããããŠããŸããã |
- High Concurrency ãš Standard ã®æ¯èŒ
çšéã«åãããŠã©ã¡ããéžã¶ã¹ããã以äžã®ããŒãã«ã§æ¯èŒã§ããŸãã
| é ç® | High Concurrency (é«äžŠå) | Standard (æšæº) |
|---|---|---|
| äž»ãªçšé | è€æ°äººã§ã®ã€ã³ã¿ã©ã¯ãã£ããªåæãBIããŒã«æ¥ç¶ | 1人ã§ã®éçºãè€éãªETLãžã§ãã®å®è¡ |
| åæå®è¡æ§ | éåžžã«é«ãïŒå€æ°ã®å°ããªã¯ãšãªã«æé©ïŒ | äœãïŒ1ã€ã®å€§ããªåŠçã«ãªãœãŒã¹ãéäžïŒ |
| ãµããŒãèšèª | SQL, Python, R | SQL, Python, R, Scala |
| ã»ãã¥ãªã㣠| ããŒãã«ã¬ãã«ã®ã¢ã¯ã»ã¹å¶åŸ¡ãå¯èœ | ãŠãŒã¶ãŒéã§ã®å®å šãªåé¢ã¯éå®ç |
ãã«ãã¿ã¹ã¯ ãžã§ãã§ã¿ã¹ã¯ãšããŠå®è¡ããããã«æ§æãããããŒãããã¯ã確èªããã«ã¯ãã©ã㪠REST API åŒã³åºããå¿ èŠïŒ
ãã«ãã¿ã¹ã¯ ãžã§ãã«ãããŠãã©ã®ããŒãããã¯ãã©ã®ã¿ã¹ã¯ã«å²ãåœãŠãããŠãããã確èªããã«ã¯ãJobs API ã®
GET /api/2.1/jobs/get
ã䜿çšããŸãã
ãã®ãšã³ããã€ã³ããåŒã³åºãããšã§ããžã§ãå ã®å šã¿ã¹ã¯ã®äŸåé¢ä¿ããããããã®ã¿ã¹ã¯ãåç §ããŠããããŒãããã¯ã®ãã¹ã詳现ã«ååŸã§ããŸãã
䜿çšãã API ãšã³ããã€ã³ã
| é ç® | å 容 |
|---|---|
| ãšã³ããã€ã³ã | GET /api/2.1/jobs/get |
| å¿ é ãã©ã¡ãŒã¿ | job_id (確èªå¯Ÿè±¡ã®ãžã§ãID) |
| ååŸã§ããäž»ãªæ å ± | åã¿ã¹ã¯ã®å®è¡é åºãããŒãããã¯ã®ãã¹ãåŒæ°ãã¯ã©ã¹ã¿ãŒèšå®ãªã©ã |
ãã§ãã¯ãã€ã³ã ãã£ã¬ã¯ããªã¯ãçµåã«ååšããäžæã®ããŒã®ç¶æ æ å ±ã远跡ããããã«äœ¿çšããããšã¯ã©ãããæå³ïŒ
æ§é åã¹ããªãŒãã³ã°ïŒStructured StreamingïŒã«ããã
ã¹ããªãŒã éçµåïŒStream-Stream JoinïŒ
ã§ã¯ããã§ãã¯ãã€ã³ããã£ã¬ã¯ããªãéåžžã«éèŠãªåœ¹å²ãæãããŸãã
ãäžæã®ããŒã®ç¶æ
æ
å ±ã远跡ããããšã¯ãç°¡åã«èšããš
ããŸã çžæ¹ãçŸããŠããªãããŒã¿ããåŸã§çµåããããã«äžæä¿ç®¡ããŠããã
ä»çµã¿ã®ããšã§ãã
ãªããç¶æ
ïŒStateïŒãã®è¿œè·¡ãå¿
èŠãªã®ãïŒ
éçãªããŒãã«å士ã®çµåïŒBatch JoinïŒãšã¯ç°ãªããã¹ããªãŒãã³ã°ã§ã¯ããŒã¿ããã©ãã©ã®ã¿ã€ãã³ã°ã§å±ããŸãã
â äŸ: ãåºåã®è¡šç€ºãã°ããšãã¯ãªãã¯ãã°ããçµåããå Žå
â 課é¡
â 衚瀺ãã°ãå±ããç¬éã«ã¯ããŸã ã¯ãªãã¯ãçºçããŠããªãïŒã¯ãªãã¯ãã°ãå±ããŠããªãïŒå¯èœæ§ããããŸãã
â 解決ç
â 衚瀺ãã°ã ãç¶æ
ïŒStateïŒã ãšããŠãã§ãã¯ãã€ã³ãã«ä¿åããŠãããåŸã§ã¯ãªãã¯ãã°ãå±ãããšãã«ãããããã¯ãã£ãå±ãã衚瀺ãã°ãšäžèŽãããªããšç
§åã§ããããã«ããŸãã
Delta Lake ã®å€æŽããŒã¿ ãã£ãŒããšTime Travel æ©èœã®éããšã¯
Databricksã«ããã ã倿ŽããŒã¿ ãã£ãŒã (Change Data Feed: CDF)ã ãš ãTime Travel (ã¿ã€ã ãã©ãã«)ã ã¯ãã©ã¡ããããŒã¿ã®å±¥æŽã«é¢ããæ©èœã§ããããã®ç®çãšä»çµã¿ã¯å€§ããç°ãªããŸãã
äžèšã§ãããšã Time Travelã¯ãéå»ã®æé¢ãèŠãã ããã®ãã®ã§ã CDFã¯ãäœãå€ãã£ããã®å·®åãåãåºãã ããã®ãã®ã§ãã
- äž»ãªéãã®æ¯èŒ
| é ç® | Time Travel (ã¿ã€ã ãã©ãã«) | 倿ŽããŒã¿ ãã£ãŒã (CDF) |
|---|---|---|
| äž»ãªç®ç | éå»ã®ç¹å®ã®æç¹ã®ããŒã¿ãåçŸã»åŸ©æ§ããã | 倿Žãããè¡ïŒæ¿å ¥ã»æŽæ°ã»åé€ïŒã®ã¿ãæœåºããŠåŸç¶ãžæµãã |
| åºåå 容 | ãã®æç¹ã®ãããŒãã«å šäœãã®ç¶æ ã | 倿Žããããå·®åããšããã®å€æŽã®çš®é¡ïŒInsert/Update/DeleteïŒã |
| æŽæ°ã®è¿œè·¡ | æŽæ°åã®å€ã¯åããããè¡åäœã§ãäœãèµ·ããããã®å€å¥ã¯å°é£ã | æŽæ°åã®å€(pre-image)ãšæŽæ°åŸã®å€(post-image)ãæç¢ºã«åºå¥ããŠä¿æã |
| æå¹åèšå® | ããã©ã«ãã§æå¹ïŒDeltaããŒãã«ã®æšæºæ©èœïŒã | æç€ºçã«æå¹åãå¿
èŠ (delta.enableChangeDataFeed = true)ã |
pipelines.reset.allowed ãšã¯
pipelines.reset.allowed ã¯ã
Databricksã® Delta Live Tables (DLT) ã«ãããŠãããŒãã«ã®åèšç®ïŒãªã»ããïŒãèš±å¯ãããã©ãããå¶åŸ¡ããããã®ããããã£ã§ãã
éåžžãDLTãã€ãã©ã€ã³ã§ããªã»ããããå®è¡ãããšãæ¢åã®ããŒã¿ãåé€ããããœãŒã¹ãããã¹ãŠã®ããŒã¿ãåèªã¿èŸŒã¿ã»ååŠçãããŸãããã®ããããã£ã false ã«èšå®ããããšã§ãæå³ããªãããŒã¿ã®æ¶å€±ããã³ã¹ãã®ãããå šä»¶ååŠçãé²ãããšãã§ããŸãã
- äž»ãªåœ¹å²ãšåäœ
| èšå®å€ | åäœ |
| :â | :â |
| true (ããã©ã«ã) | ãã€ãã©ã€ã³èšå®ããããªã»ããããå®è¡å¯èœãæ¢åã®ããŒãã«ããŒã¿ã¯åé€ãããäžããäœãçŽãããŸãã |
| false | ãªã»ããæäœãçŠæ¢ãããŸãããªã»ããããããšãããšãšã©ãŒãçºçããããŒã¿ã¯ä¿è·ãããŸãã |
2. ãªããã®ããããã£ãå¿
èŠãªã®ãïŒ
å€§èŠæš¡ãªããŒã¿åºç€ãæ¬çªç°å¢ã§ã¯ã以äžã®ãªã¹ã¯ãåé¿ããããã« false ã«èšå®ããããšãæšå¥šãããŸãã
é«é¡ãªåèšç®ã³ã¹ã: æ°ãã©ãã€ããæ°ãã¿ãã€ãããããŒãã«ã誀ã£ãŠãªã»ãããããšãååŠçã«èšå€§ãªã³ã³ãã¥ãŒãã£ã³ã°ã³ã¹ããšæéãããããŸãã
å€éšã·ã¹ãã ããã® CDC ããŒã¿ãèªåçã«åŠçãšã¯
å€éšã·ã¹ãã ããã® CDC (Change Data Feed / Change Data Capture) ããŒã¿ã®èªååŠçãšã¯ãããŒã¿ããŒã¹ïŒSQL Server, Oracle, MySQLãªã©ïŒã§çºçãããæ¿å ¥ã»æŽæ°ã»åé€ãã®å±¥æŽããªã¢ã«ã¿ã€ã ã«æ€ç¥ããDatabricksäžã®Delta Lakeãžèªåçã«åæ ãããä»çµã¿ãæããŸãã
Databricksã§ã¯ã䞻㫠Delta Live Tables (DLT) ã®
APPLY CHANGES INTO
ãšããæ©èœã䜿ã£ãŠããã®è€éãªåŠçãç°¡æœã«å®è£
ããŸãã
- åŠçã®å
šäœåïŒãããŒïŒ
å€éšã·ã¹ãã ããããŒã¿ãå±ããDeltaããŒãã«ã«åæ ããããŸã§ã®æšæºçãªã¹ãããã¯ä»¥äžã®éãã§ãã
| ã¹ããã | å
容 |
| :â | :â |
| 1. æ€ç¥ (Capture) | å€éšDBã®ãã°ãããã©ã®è¡ãã©ãå€ãã£ããããæœåºããïŒDebeziumãFivetranãªã©ã䜿çšïŒã |
| 2. åã蟌㿠(Ingestion) | 倿Žãã°ãã¡ãã»ãŒãžãã¥ãŒïŒKafka, Azure Event Hubsãªã©ïŒçµç±ã§Databricksã«ã¹ããªãŒãã³ã°ã |
| 3. 倿 (Transform) | å±ãããã°ïŒInsert/Update/Deleteã®ãã©ã°ä»ãïŒãè§£æã |
| 4. åæ (Apply) | APPLY CHANGES INTO ã䜿ããã¿ãŒã²ããããŒãã«ã«ææ°ç¶æ
ãæžã蟌ãïŒããŒãžïŒã |
2. ãªããèªååŠçããå¿
èŠãªã®ãïŒïŒèª²é¡ãšè§£æ±ºïŒ
æåã§SQLã® MERGE æãæžãããšãå¯èœã§ãããCDCããŒã¿ã®åŠçã«ã¯ä»¥äžã®ãããªç¹æã®é£ããããããŸãã
| èª²é¡ | èªååŠçïŒDLTã® APPLY CHANGESïŒã«ãã解決 |
|---|---|
| é åºã®å¶åŸ¡ | åãè¡ã«å¯ŸãããæŽæ°âåé€ããé転ããŠå±ããŠããã·ãŒã±ã³ã¹çªå·ãã¿ã€ã ã¹ã¿ã³ããèŠãŠæ£ããåŠçããŸãã |
| åé€ã®æ±ã | ãœãŒã¹åŽã§åé€ãããã¬ã³ãŒãããã¿ãŒã²ããåŽã§ãèªåçã«ç©çåé€ãŸãã¯è«çåé€ããŸãã |
| ã¹ããŒãé²å | ãœãŒã¹åŽã«æ°ããåãå¢ããéãèªåçã«ã¿ãŒã²ããããŒãã«ã®å®çŸ©ãæ¡åŒµã§ããŸãã |
| èšç®ã®éè€æé€ | çæéã«åãIDã®æŽæ°ãè€æ°åå±ããå Žåãææ°ã®ç¶æ ã ããé©çšããŠå¹çåããŸãã |
è¿°èªããã·ã¥ããŠã³ãšç©çãã©ã³ã®é¢ä¿æ§
è¿°èªããã·ã¥ããŠã³ïŒPredicate PushdownïŒãšç©çãã©ã³ïŒPhysical PlanïŒ ã®é¢ä¿ã¯ãäžèšã§ããã°ãã¯ãšãªã®å®è¡å¹çãæå€§åããããã«ãããŒã¿èªã¿èŸŒã¿ã®åææ®µéã§ãã£ã«ã¿ãªã³ã°ãçµã¿èŸŒãæé©åããã»ã¹ãã®ããšã§ãã
Sparkã®ã¯ãšãªå®è¡ãšã³ãžã³ïŒCatalyst OptimizerïŒããç§ãã¡ãæžããSQLãDataFrameã®åŠçãã©ã®ããã«è§£éããç©ççãªåäœã«å€æããããšããæèã§éåžžã«éèŠã«ãªããŸãã
è¿°èªããã·ã¥ããŠã³ãšã¯ïŒ
ãè¿°èªïŒPredicateïŒããšã¯ãSQLã® WHERE å¥ã filter() æ¡ä»¶ïŒäŸïŒage > 20ïŒã®ããšã§ãã
éåžžãããŒã¿ããã¹ãŠã¡ã¢ãªã«èªã¿èŸŒãã§ãããã£ã«ã¿ãªã³ã°ããã®ã§ã¯ãªãããããŒã¿ãœãŒã¹ïŒã¹ãã¬ãŒãžïŒåŽãã§å¯èœãªéãå ã«çµã蟌ãããšããããã·ã¥ããŠã³ïŒæŒãäžããïŒããšåŒã³ãŸãã
ç©çãã©ã³ãšã®é¢ä¿æ§
Sparkãã¯ãšãªãå®è¡ããéãåŠçã¯ãè«çãã©ã³ããããç©çãã©ã³ããžãšå€æãããŸããè¿°èªããã·ã¥ããŠã³ã¯ãã®éçšã§ä»¥äžã®ããã«åæ ãããŸãã
| ãã§ãŒãº | è¿°èªããã·ã¥ããŠã³ã®åã |
|---|---|
| è«çãã©ã³ (Logical Plan) | ãŠãŒã¶ãŒãæžããéãã«ãããŒã¿ãèªããâããã£ã«ã¿ãŒããããšããé åºã§æ§æãããŸãã |
| æé©åãã©ã³ (Optimized Plan) | ã«ã¿ãªã¹ãæé©åã«ããããã£ã«ã¿ãŒæ¡ä»¶ããããŒã¿ãèªããåŠçã®ããäžããããã¯äžãžãšç§»åããŸãã |
| ç©çãã©ã³ (Physical Plan) | ãããæ žå¿ã§ãã å®éã«ãã¡ã€ã«ïŒParquetãDeltaïŒãèªã¿åã FileScan æäœã®äžã«ããã£ã«ã¿ãŒæ¡ä»¶ãåã蟌ãŸããŸãã |
pyspark.sql.functions.broadcast ãšã¯
pyspark.sql.functions.broadcast ã¯ã
Sparkã®çµåïŒJoinïŒåŠçãåçã«é«éåããããã®é¢æ°ã§ãã
äžèšã§ãããšã ãå°ããæ¹ã®ããŒãã«ãå šã¯ãŒã«ãŒããŒãã«äžžããšã³ããŒïŒæŸéïŒããããšã§ããããã¯ãŒã¯çµç±ã®éãããŒã¿ç§»åïŒã·ã£ããã«ïŒãåé¿ããã ä»çµã¿ã§ãã
- åäœã®ä»çµã¿
- Broadcast Hash Join
éåžžã®çµåïŒSort Merge JoinïŒã§ã¯ãçµåããŒã«åºã¥ããŠäž¡æ¹ã®ããŒãã«ã®ããŒã¿ããããã¯ãŒã¯è¶ãã«äžŠã¹æ¿ãããã·ã£ããã«ããçºçããŸãããããããã«ããã¯ã«ãªããŸãã
- Broadcast Hash Join
broadcast ã䜿çšãããšä»¥äžã®æµãã«å€ãããŸãã
â åé: å°ããããŒãã«ããã©ã€ããŒããŒãã«äžåºŠéããã
â é
åž: ãã®ããŒã¿ãå
šã¯ãŒã«ãŒããŒãïŒExecutorïŒã«ã³ããŒããŠé
ä¿¡ããã
â ããŒã«ã«çµå: åã¯ãŒã«ãŒããæå
ã«ãã倧ããªããŒãã«ã®æçãšãé
åžãããå°ããããŒãã«ããã®å Žã§çµåããã
- ã¡ãªãããšãã¡ãªãã
| é
ç® | ã¡ãªãã | ãã¡ãªããã»ãªã¹ã¯ |
| :â | :â | :â |
| ããã©ãŒãã³ã¹ | å€§èŠæš¡ãªããŒã¿ç§»åïŒã·ã£ããã«ïŒããªããªããããéåžžã«é«éã | ã¡ã¢ãªäžè¶³ïŒOOMïŒã®ãªã¹ã¯ã倧ããããããŒãã«ãæŸéãããšExecutorãã¯ã©ãã·ã¥ããŸãã |
| ãªãœãŒã¹ | ãããã¯ãŒã¯åž¯åã®æ¶è²»ãæããããã | ãã©ã€ããŒããŒããšå
šExecutorã®ã¡ã¢ãªãæ¶è²»ããã |
3. åºæ¬çãªäœ¿ãæ¹
from pyspark.sql.functions import broadcast # large_df ã¯æ°åè¡ãsmall_df ã¯æ°åè¡ãšæ³å® # small_df ãå šããŒãã«é åžããŠçµåãã result_df = large_df.join(broadcast(small_df), "id")
ã¹ããŒã¯ ããŒãã£ã·ã§ã³ã®ãµãã»ããã«å²ãåœãŠãããããŒã¿ãå¢ããããšã§çºçããã¹ãã¥ãŒãšã¯
Sparkã«ããã ã¹ãã¥ãŒïŒData Skew / ããŒã¿æªã¿ïŒ ãšã¯ã
ç¹å®ã®ããŒãã£ã·ã§ã³ã«ããŒã¿ã極端ã«åã£ãŠããŸããäžéšã®ã¯ãŒã«ãŒããŒãã ããéè² è·ã«ãªãçŸè±¡ãæããŸãã
忣åŠçã®çæ³ã¯ãå
šããŒããåçã«ä»äºãçµããããšãã§ãããã¹ãã¥ãŒãçºçãããšã ãã»ãšãã©ã®ããŒãã¯æãªã®ã«ãäžã€ã®ããŒãã ããå»¶ã
ãšåŠçãç¶ããŠããã
ãšããç¶æ
ã«ãªããå
šäœã®åŠçæéããã®é
ãããŒãã«åŒãããããŠããŸããŸãã
- ã¹ãã¥ãŒãçºçããä»çµã¿
Sparkã¯ããŒã¿ããããŒãã£ã·ã§ã³ããšããåäœã«åå²ããŠäžŠååŠçããŸããéåžžãããã·ã¥é¢æ°ãªã©ãçšããŠåçã«åé ããããšããŸãããç¹å®ã®ããŒã«ããŒã¿ãéäžããŠãããšåããçºçããŸãã
| ç¶æ
| ããŒã¿ã®åé
| åŠçã®æ§å |
| :â | :â | :â |
| æ£åžžïŒåçïŒ | å
šãŠã®ããŒãã£ã·ã§ã³ãã»ãŒåããµã€ãºïŒäŸïŒå100MBïŒã | å
šããŒããåæã«åŠçãçµããå¹çãæå€§åãããã |
| ã¹ãã¥ãŒçºç | ç¹å®ã®ããŒãã£ã·ã§ã³ã ã巚倧ïŒäŸïŒ1ã€ã ã10GBãä»ã¯10MBïŒã | 巚倧ãªããŒãã£ã·ã§ã³ãæ
åœããã¯ãŒã«ãŒãçµãããŸã§ãå
šäœã®ãžã§ããçµãããªãã |
2. ãªãã¹ãã¥ãŒãèµ·ããã®ãïŒïŒäž»ãªåå ïŒ
| åå | å 容 |
|---|---|
| ç¹å®ã®ããŒã®éäž | çµåïŒJoinïŒãéèšïŒGroupByïŒã®éãç¹å®ã®IDïŒäŸïŒNULL ã defaultãè¶
倧æé¡§å®¢ã®IDïŒã«ããŒã¿ãæ°åä»¶éäžããŠããã |
| äžé©åãªããŒãã£ã·ã§ã³èšèš | ããŒã¿ã®ã«ãŒãã£ããªãã£ïŒå€ã®çš®é¡ïŒãäœãåãããŒãã£ã·ã§ã³ããŒã«éžãã§ããŸã£ãã |
| ããŒã¿ã®ç¹æ§ | ããããçŸå®ã®ããŒã¿ããç¹å®ã®æ¥ã«ã ãéäžããŠããããšãã£ãåããæã£ãŠããã |
- ã¹ãã¥ãŒãåŒãèµ·ããåé¡
â ãžã§ãã®é·æéå
â 99%ã®ã¿ã¹ã¯ãæ°ç§ã§çµããã®ã«ãæ®ãã®1%ãæ°æéãããã
â ã¡ã¢ãªäžè¶³ (OOM)
â ç¹å®ã®ã¯ãŒã«ãŒã®ã¡ã¢ãªã«ããŒã¿ãå
¥ãåãããExecutorãã¯ã©ãã·ã¥ããã
â ãªãœãŒã¹ã®æµªè²»
â äžéšã®ããŒããåŸ
ã£ãŠããéãä»ã®ããŒãã®èšç®ãªãœãŒã¹ãã¢ã€ãã«ç¶æ
ïŒç¡é§ïŒã«ãªãã
ããŒã¿å質ã«ãŒã«ããã€ãã©ã€ã³ã®ã¿ãŒã²ããã¹ããŒãå€ã®DeltaããŒãã«ã«ä¿æãããšã¯ã©ãèšãããš?
- æ§æã®ã€ã¡ãŒãž
- ã¿ãŒã²ããïŒåºåå
ïŒ:
- å®éã«æ¥åã§äœ¿ãã売äžããŒãã«ããã顧客ããŒãã«ããªã©ã
- 管ççšDeltaããŒãã«ïŒå€åºãå
ïŒ:
- ãã©ã®ããŒãã«ã®ãã©ã®ã«ã©ã ã«ãã©ããªãã§ãã¯ïŒNULLçŠæ¢ãªã©ïŒãããããããšããèšå®ããŒã¿ã ããæ ŒçŽããããŒãã«ã
- å Žæ:
- æ¥åããŒã¿ãšã¯å¥ã®ç®¡ççšã¹ããŒãïŒäŸ: metadata_db.quality_rulesïŒã«é 眮ããŸãã
- ã¿ãŒã²ããïŒåºåå
ïŒ:
- ãªããå€ãã«ä¿æããã®ãïŒã¡ãªãã ïŒ
- ã³ãŒãã®å€æŽãªãã§ã«ãŒã«ãæŽæ°ã§ãã
- æ°ããããã®åã«ãã€ãã¹ã®å€ãå ¥ããªãããã«ãããããšãªã£ãå ŽåãPython/SQLã®ã³ãŒããæžãæããŠãããã€ãçŽãå¿ èŠããããŸããã管ççšããŒãã«ã«1è¡è¿œå ããã ãã§ã次åã®å®è¡ããé©çšãããŸãã
- éãšã³ãžãã¢ã§ãã«ãŒã«ç®¡çãå¯èœ
- ããŒã¿ã¢ããªã¹ããããžãã¹æ åœè ãã管ççšããŒãã«ïŒãããã¯ãããç·šéããGUIïŒãéããŠãèªåãã¡ã§å質ã«ãŒã«ã調æŽã§ããããã«ãªããŸãã
3.ã«ãŒã«ã®äžå
管çãšåå©çš
â è€æ°ã®ãã€ãã©ã€ã³ã§åããäœæãã©ãŒããããã§ãã¯ããªã©ã䜿ãå Žåãäžã€ã®ããŒãã«ã§å®çŸ©ããŠããã°ãåãã€ãã©ã€ã³ããããåç
§ããã ãã§æžã¿ãŸãã
äŸ
from pyspark.sql import functions as F
def apply_quality_rules(df, target_table_name):
# 1. å€éšããŒãã«ãã該åœããŒãã«ã®ã«ãŒã«ãååŸ
rules_df = spark.table("metadata.quality_rules") \
.filter(F.col("table_name") == target_table_name) \
.collect()
# 2. ã«ãŒã«ãé©çš
# ããã§ã¯ãæ€åã«ã©ã ãã远å ããNGãªã¬ã³ãŒãã«ãã©ã°ãç«ãŠãäŸ
for rule in rules_df:
rule_name = f"is_failed_{rule['rule_id']}"
# æ¡ä»¶ã«åèŽããªãïŒå質éåïŒå Žåã« True
df = df.withColumn(rule_name, F.expr(f"NOT ({rule['condition']})"))
return df
䜿çšäŸ
ã«ãŒã«ããŒãã«äŸ
| rule_id | table_name | column_name | rule_type | condition (SQLåŒ) | expectation_level |
|---|---|---|---|---|---|
| 1 | sales_gold |
order_id |
NOT_NULL |
order_id IS NOT NULL |
FAIL |
| 2 | sales_gold |
amount |
POSITIVE |
amount > 0 |
WARN |
| 3 | sales_gold |
status |
VALID_LIST |
status IN ('ordered', 'shipped') |
WARN |
| 4 | player_logs |
pos_x |
STAGE_OUT |
pos_x >= 0 |
WARN |
| 5 | player_logs |
hp |
MAX_CAP |
hp <= 999 |
WARN |
ã«ãŒã«ããŒãã«ã®äœæäŸ
-- å質ã«ãŒã«ã管çãããã¹ã¿ãŒããŒãã«
CREATE TABLE IF NOT EXISTS metadata.quality_rules (
rule_id INT,
table_name STRING, -- 察象ããŒãã«å
column_name STRING, -- 察象ã«ã©ã å
rule_type STRING, -- ã«ãŒã«çš®å¥ïŒNOT_NULL, RANGE, etc.ïŒ
condition STRING, -- SQLã®æ¡ä»¶åŒ
expectation_level STRING -- 'FAIL' (忢), 'WARN' (èŠåã®ã¿)
) USING DELTA;
-- ãµã³ãã«ããŒã¿ã®æ¿å
¥
INSERT INTO metadata.quality_rules VALUES
(1, 'sales_gold', 'order_id', 'NOT_NULL', 'order_id IS NOT NULL', 'FAIL'),
(2, 'sales_gold', 'amount', 'POSITIVE_VALUE', 'amount > 0', 'WARN');
äžèšã®æŽ»çšãšéåæ å ±ã®åºå
from pyspark.sql import functions as F
def apply_quality_rules(df, target_table_name):
# 1. å€éšããŒãã«ãã該åœããŒãã«ã®ã«ãŒã«ãååŸ
rules_df = spark.table("metadata.quality_rules") \
.filter(F.col("table_name") == target_table_name) \
.collect()
# 2. ã«ãŒã«ãé©çš
# ããã§ã¯ãæ€åã«ã©ã ãã远å ããNGãªã¬ã³ãŒãã«ãã©ã°ãç«ãŠãäŸ
for rule in rules_df:
rule_name = f"is_failed_{rule['rule_id']}"
# æ¡ä»¶ã«åèŽããªãïŒå質éåïŒå Žåã« True
df = df.withColumn(rule_name, F.expr(f"NOT ({rule['condition']})"))
return df
# 䜿çšäŸ
raw_df = spark.table("stg_sales")
validated_df = apply_quality_rules(raw_df, "sales_gold")
# éåãããã¬ã³ãŒããç¹å®
failed_records = validated_df.filter("OR ".join([f"is_failed_{r['rule_id']}" for r in rules_df]))
Delta Live Tablesã®æŠèŠãšäœ¿çšäŸãæããŠ
ãããŒã¿ã®æµãïŒãã€ãã©ã€ã³ïŒãSQLãPythonã§å®£èšããã ãã§ãæ§ç¯ã»éçšãDatabricksãèªååããŠããããã¬ãŒã ã¯ãŒã¯ã
ã®ããšã§ãã
Delta Live Tables (DLT) ãšã¯ïŒ
éåžžãããŒã¿ãã€ãã©ã€ã³ãäœãã«ã¯ãããŒãã«äœæããããŒã¿ã®èªã¿èŸŒã¿ãã倿ïŒå å·¥ïŒãããšã©ãŒãã³ããªã³ã°ãããªãã©ã€åŠçããªã©ã现ããå®è£ ãããžã§ããšããŠã¹ã±ãžã¥ãŒã«ããå¿ èŠããããŸãã
DLTã§ã¯ããããã ãã©ã®ãããªç¶æ ã®ããŒãã«ãäœããããïŒå®£èšçïŒã ãšèšè¿°ããã ãã§ã以äžã®äœæ¥ãè©ä»£ããããŠãããŸãã
- ã€ã³ãã©ã®èªå管ç
- å®è¡æã«ã¯ã©ã¹ã¿ãŒãèµ·åããçµãã£ãã忢ãè² è·ã«å¿ããŠã¹ã±ãŒã«ã
- äŸåé¢ä¿ã®èªå解決
- AããŒãã«ãã§ããŠããBããŒãã«ãæŽæ°ããããšãã£ãé åºãèªåå€å®ã
- ããŒã¿å質ã®ç®¡ç (Expectations)
- ä»å話é¡ã«ããŠãããå質ãã§ãã¯ããæšæºæ©èœã§ãµããŒãã
- ãªããŒãžïŒå±¥æŽïŒã®å¯èŠå
- ããŒã¿ã®æµããGUIã§ç¢ºèªå¯èœã
DLTã®äœ¿çšäŸïŒSQLãšPythonïŒ
DLTã¯ã䞻㫠Medallion ArchitectureïŒã¡ããªãªã³ã»ã¢ãŒããã¯ãã£ïŒ ãšåŒã°ãããBronzeïŒçããŒã¿ïŒâ SilverïŒã¯ã¬ã³ãžã³ã°ïŒâ GoldïŒéèšïŒã®æµããæ§ç¯ããã®ã«æé©ã§ãã
A. SQLã§ã®äŸïŒå®£èšçãªèšè¿°ïŒ
SQLã®å ŽåãLIVE ããŒã¯ãŒããä»ããã ãã§ãã€ãã©ã€ã³ã®äžéšã«ãªããŸãã
-- 1. Bronze: çããŒã¿ãèªã¿èŸŒãïŒãªãŒãããŒããŒæ©èœïŒ
CREATE OR REFRESH STREAMING LIVE TABLE sales_raw
AS SELECT * FROM cloud_files("/mnt/data/sales_json", "json");
-- 2. Silver: å質ã«ãŒã«ãé©çšããŠã¯ã¬ã³ãžã³ã°
CREATE OR REFRESH STREAMING LIVE TABLE sales_cleaned (
CONSTRAINT valid_amount EXPECT (amount > 0) ON VIOLATION DROP ROW
)
AS SELECT * FROM LIVE.sales_raw;
B. Pythonã§ã®äŸ
å
ã»ã©äœæãããå€éšããŒãã«ã®ã«ãŒã«ããèªã¿èŸŒãä»çµã¿ãšéåžžã«çžæ§ãè¯ãã§ãã
import dlt
from pyspark.sql import functions as F
@dlt.table
def sales_gold():
# å€éšã®DeltaããŒãã«ããã«ãŒã«ãååŸããŠé©çšãããªã©ã®æè»ãªåŠçãå¯èœ
return spark.readStream.table("LIVE.sales_cleaned") \
.groupBy("category") \
.agg(F.sum("amount").alias("total_sales"))
ãªãDLTã䜿ãã®ãïŒïŒã¡ãªããïŒ
- å質ãã§ãã¯ã楜
- EXPECT (æ¡ä»¶) ON VIOLATION DROP ROW ãšæžãã ãã§ãäžæ£ãªããŒã¿ãèªåçã«é€å€ïŒãŸãã¯èŠåïŒã§ããŸãã
- ã¹ããªãŒãã³ã°ãšãããã®èå
- ããŒã¿ãå±ããã³ã«æŽæ°ãããã¹ããªãŒãã³ã°ããã宿çãªãããããããåãã³ãŒãã§åãããŸãã
- ãšã©ãŒåŸ©æ§
- ãã€ãã©ã€ã³ãéäžã§æ¢ãŸã£ãŠãããã§ãã¯ãã€ã³ãããèªåã§åéããŠãããŸãã