ã¯ããã«
Databricksã®Certified Data Engineer Associate詊éšã§ããåãããé
ç®ã
çè§£ãçãã£ãã®ã§åŠç¿ããªãããé
ç®ããŸãšããŸããã
é
ç®ããšã«ç°¡æœã«åããŠããã®ã§ããã£ãšæ¯ãè¿ãããå Žåãªã©ã«ãã²ã掻çšãã ãã
åçŽã«Databricksã®äŸ¿å©æ©èœãç¥ãããå ŽåãæŽ»çšã§ãããšæããŸã
.groupBy(âidâ).agg(F.sum(âamountâ).alias(âtotalâ))ã§ãFããªããã¿ãŒã³ã®ã³ãã³ããããïŒaliasãšèšãã³ãã³ãã¯å¿ ãå¿ èŠïŒ
â以äžã®æãã§ãæžãã
from pyspark.sql.functions import sum
# F. ãªãã§å®è¡å¯èœ
df.groupBy("id").agg(sum("amount").alias("total"))
以äžã§ããã°ã«ã©ã å㯠sum(amount) ãšãªã
# alias ãªã
df.groupBy("id").agg(F.sum("amount"))
OPTIMIZEã2åé£ç¶ã§è¡ãæå³ã¯ããïŒ
åãããŒãã«ã«å¯ŸããŠããŒã¿æŽæ°ããªãç¶æ
ã§ OPTIMIZE ã2åé£ç¶ã§è¡ãæå³ã¯ãåºæ¬çã«ã¯ãããŸããã
Lakehouse Federationãšã¯
Unity Catalogãåžä»€å¡ãšãªããMySQLãPostgreSQLãSnowflakeãBigQueryãšãã£ãå€éšãœãŒã¹ã«å¯ŸããŠãã³ãã¯ã·ã§ã³ãã匵ããããããDatabricksäžã®ãå€éšã«ã¿ãã°ïŒForeign CatalogïŒããšããŠããŠã³ãããŸãã
| æ¯èŒé ç® | å€éšããŒãã« (External Table) | Lakehouse Federation |
|---|---|---|
| æ¥ç¶å | ã¯ã©ãŠãã¹ãã¬ãŒãž (S3 / ADLS / GCS) | å€éšDB (MySQL, Snowflake, BigQueryç) |
| ããŒã¿åœ¢åŒ | ãã¡ã€ã« (Delta, Parquet, CSV, JSON) | DBå ã®ããŒãã« (åDBã®ç¬èªåœ¢åŒ) |
| èšç®ãšã³ãžã³ | Databricks ããã¡ã€ã«ãèªã¿åã£ãŠåŠç | æ¥ç¶å DB ãåŠçããŠçµæã ããè¿ã |
| ããã©ãŒãã³ã¹ | é«é (ç¹ã«Deltaãªãæé©åãå¹ã) | æ¥ç¶å DBã®æ§èœã«äŸå (äœéã«ãªããã¡) |
| éçšè² è· | ã¹ããŒãå®çŸ©ããã¹æå®ã®ç®¡çãå¿ èŠ | æ¥ç¶èšå®ïŒConnectionïŒãäœãã ãã§OK |
å€éšããŒãã«ãäœãå Žåã¯Managed ãå¿ èŠïŒ Create Tableã ãïŒ
å€éšããŒãã«ãäœãéã«Managedã¯å¿
èŠãããŸããã
ããããLOCATION å¥ãæžããæžããªããã§ãã©ã¡ãã«ãªãããæ±ºãŸããŸãã
| ããŒãã«ã®çš®é¡ | SQLã®æžãæ¹ | ããŒã¿ã®ä¿åå Žæ | ããŒãã«ã DROP ãããšïŒ |
|---|---|---|---|
| Managed Table | CREATE TABLE ... |
Databricksã管çããå°çšã¹ãã¬ãŒãž | ã¡ã¿ããŒã¿ãããŒã¿ãã¡ã€ã«ãæ¶ãã |
| External Table | CREATE TABLE ... LOCATION 's3://...' |
èªåã§æå®ãããã¹ïŒS3/ADLSãªã©ïŒ | ã¡ã¿ããŒã¿ã ãæ¶ããããŒã¿ãã¡ã€ã«ã¯æ®ã |
ãã¹ããªã«ã«ããŒã¿ã¯Bronzeã«å ¥ããã¹ãïŒ
çµè«ããèšããšããã¯ãããã¹ããªãŒããŒã¿ïŒå±¥æŽããŒã¿ïŒã¯å¿ ãBronzeã¬ã€ã€ãŒã«ä¿æããŠããã¹ããã§ãã
Photonãšã¯ïŒã©ãããåé¡ãåãããã¡ïŒ
- åãããç¹: åŸæ¥ã®Sparkãšã³ãžã³ïŒJVM/JavaïŒãšã®éãã
- æ£è§£: Photon㯠C++ ã§å®è£ ãããŠããããã¯ãã«åå®è¡ïŒVectorized ExecutionïŒ ãšããæè¡ã䜿ã£ãŠããŸãã
- çç±: Javaã®ã¡ã¢ãªç®¡çïŒã¬ããŒãžã³ã¬ã¯ã·ã§ã³ïŒã®ãªãŒããŒããããé¿ããCPUã®æ§èœïŒSIMDåœä»€ãªã©ïŒãéçãŸã§åŒãåºãããã§ãã
- åãããç¹: ã©ã®ã³ã³ãã¥ãŒãã£ã³ã°ã¿ã€ãã§Photonã䜿ãããïŒ
- æ£è§£: * SQL Warehouse: ProãšServerlessã§ã¯ããã©ã«ãã§æå¹ïŒãªãã«ã§ããªãïŒã
- All-Purpose / Job Cluster: ã¯ã©ã¹ã¿ãŒäœææã«ãã§ãã¯ããã¯ã¹ã§æå¹åã§ããã
Databricks jobã®äžã§ãã³ã³ãã¥ãŒããªãœãŒã¹ã¯èªåã§ã¹ã±ãŒã«ã€ã³ã¢ãŠããããïŒ
ã¯ããèªåã§ã¹ã±ãŒã«ã€ã³ã»ã¢ãŠãïŒããŒãã®å¢æžïŒãå¯èœã§ãã
spark.read.json(âãã¹â)ãŸãã¯spark.read.format(âjsonâ).load(âãã¹â)ã§jsonãèªããïŒ
ã©ã¡ãã®æ¹æ³ã§ãæ£ããJSONãã¡ã€ã«ãèªã¿èŸŒãããšãã§ããŸãã
ã³ã³ããã®jsonããŒã¿ãèªãå Žåã¯ã©ããªã³ãã³ãã«ãªãïŒ.format(âcloudâ)ãªã©ã¯äœ¿ãïŒ
format("cloud") ãšããæå®ã¯ååšããŸãã
ã¹ããŒãã倿ŽãããŠããèœãšãããå Žåã¯ãã©ããªãªãã·ã§ã³ã䜿ããïŒfaillon <>ãªã©ãããïŒ
ãã¹ããŒããå€ãã£ããåæã«é²ããã«ããšã©ãŒãåºããŠæ¢ãŸã£ãŠã»ããããšããå Žåã«äœ¿ããªãã·ã§ã³ã¯
cloudFiles.schemaEvolutionMode ã§ãå€ã« failOnNewColumns ãæå®ããŸãã
REFRESH TABLEã®æå³
DatabricksïŒSpark SQLïŒã«ããã REFRESH TABLE ãšã¯ã ãããŒãã«ã®ã¡ã¿ããŒã¿ïŒç®¡çæ
å ±ïŒãææ°ç¶æ
ã«æŽæ°ããŠããã£ãã·ã¥ãã¯ãªã¢ããã³ãã³ãã ã®ããšã§ãã
ãããŒã¿ãããã«ããã¯ããªã®ã«ãã¯ãšãªãæããŠãåæ ãããªãïŒããšããæã«äœ¿ããããã° ãæ å ±ã®åèªã¿èŸŒã¿ãã¿ã³ã ã®ãããªåœ¹å²ãæãããŸãã
äŸ: ä»ã®ã·ã¹ãã ã S3 äžã® Parquet ãã¡ã€ã«ãäžæžãããã çµæ: SELECT ãæããŠããSpark ã¯ä»¥åèªã¿èŸŒãã ãã£ãã·ã¥ãå€ãã¡ã¿ããŒã¿ãèŠã«è¡ã£ãŠããŸããæ°ããŒã¿ãåæ ãããªãã 察ç: REFRESH TABLE table_name ãå®è¡ããã¡ã¿ããŒã¿ãåã¹ãã£ã³ããŠãã£ãã·ã¥ãç¡å¹åããã
Delta Sharingãè¡ãå ŽåãUnityã®ã¡ã¿ã¹ãã¢IDã®å ±æãå¿ èŠïŒ
ãDatabrickséïŒUC-to-UCïŒã§å ±æãè¡ãå Žåãã¯ãåä¿¡åŽã®ã¡ã¿ã¹ãã¢IDïŒæ£ç¢ºã«ã¯ãå ±æèå¥åãïŒã®å ±æãå¿ é
Unity Catalogã®LineageïŒããŒã¿ãªããŒãžïŒæ©èœãšã¯ïŒ
ããã¯ãããŒã¿ã®å®¶ç³»å³ããèªåã§èšé²ã»å¯èŠåããŠãããæ©èœã®ããšã§ãã
ããã®ããŒãã«ã®ããŒã¿ã¯ã©ãããæ¥ãã®ãïŒãããã®ã«ã©ã ãåé€ããããã©ã®ã¬ããŒããå£ãããïŒããšãã£ãçåã«ãã³ãŒããèªã¿è§£ãããšãªãäžç¬ã§çããŠãããŸãã
1. Lineageã§ãããããš
Unity Catalogã®ãªããŒãžã¯ãåãªããããŒãã«éã®ã€ãªããã以äžã®æ å ±ãèŠããŠãããŸãã
- ããŒãã«ã¬ãã«ã®ãªããŒãž: ã©ã®ãœãŒã¹ããŒãã«ãããã©ã®ã¿ãŒã²ããããŒãã«ãäœããããã
- ã«ã©ã ã¬ãã«ã®ãªããŒãž: ïŒãããéåžžã«åŒ·åïŒïŒç¹å®ã®ã«ã©ã ããå ããŒã¿ã®ã©ã®ã«ã©ã ãçµã¿åãããŠèšç®ããããã®ãã
- ãšã³ãã£ãã£éã®ã€ãªãã: ããŒãã«ã ãã§ãªãããããåŠçããããŒãããã¯ãå®è¡ããããžã§ããæçµçã«è¡šç€ºããŠããããã·ã¥ããŒãïŒAI/BIïŒ ãŸã§ç¹ãã£ãŠèŠããŸãã
- AIã¢ãã«ã®ãªããŒãž: ãã®ã¢ãã«ãã©ã®ããŒã¿ã»ããã§åŠç¿ããããïŒFeature Storeãšã®é£æºïŒã
All-Purpose Clusterãšã¯
ããŒã¿ãµã€ãšã³ãã£ã¹ãããšã³ãžãã¢ããéçºãå®éšãåæã®ããã«èªç±ã«ããã€ã€ã³ã¿ã©ã¯ãã£ãã«äœ¿ãããã®èšç®ãªãœãŒã¹ãã®ããšã§ãã
Databricksã®ããŒãããã¯ãéããŠãã³ãŒãã1è¡ãã€å®è¡ããªããçµæã確èªããâŠâŠãããªæã«èåŸã§åããŠããã®ããã®ã¯ã©ã¹ã¿ãŒã§ãã
| é ç® | All-Purpose Cluster | Job Cluster |
|---|---|---|
| äž»ãªçšé | éçºã»åæã»ãããã° | æ¬çªéçšã»å®æå®è¡ïŒETLïŒ |
| èµ·åã¿ã€ãã³ã° | ãŠãŒã¶ãŒãæåïŒãŸãã¯ããŒãããã¯æ¥ç¶æïŒ | ãžã§ãã®éå§æã«èªåçæ |
| çµäºã¿ã€ãã³ã° | ãŠãŒã¶ãŒãæåïŒãŸãã¯èªå忢èšå®ïŒ | ãžã§ãå®äºæã«èªåã§ç Žæ£ |
| ã³ã¹ãïŒDBUïŒ | é«ã | å®ãïŒçŽååçšåºŠïŒ |
| æ°žç¶æ§ | 忢ããŠãèšå®ã¯æ®ã | ãžã§ãããšã«äœ¿ãæšãŠ |
dlt.viewãšdlt.tableã®éã
| ç¹åŸŽ | dlt.table (ããŒãã«) | dlt.view (ãã¥ãŒ) |
|---|---|---|
| ããŒã¿ã®æ°žç¶æ§ | ã¹ãã¬ãŒãžã« Delta圢åŒã§ä¿åããã | ã¹ãã¬ãŒãžã« ä¿åãããªã |
| å€éšããã®åç § | ä»ã®ããŒãããã¯ãBIããŒã«ããåç §å¯èœ | DLTãã€ãã©ã€ã³å ã§ã®ã¿äœ¿çšå¯èœ |
| èšç®ã®ã¿ã€ãã³ã° | ãã€ãã©ã€ã³å®è¡æã«äžåºŠã ãèšç® | åŸç¶ã®åŠçãåããã³ã«åèšç® |
| äž»ãªçšé | æçµçµæãäžéããã¯ã¢ãããå€éšå ¬éçš | äžéåŠçïŒã¯ãªãŒãã³ã°ãæ©å¯æ å ±ã®åé€ïŒ |
| ã¹ãã¬ãŒãžã³ã¹ã | çºçãã | çºçããªã |
dlt.view ã䜿ãã¹ãæ
ããã®åŠçã®çµæãããã€ãã©ã€ã³ã®å€ã§èŠããå¿
èŠã¯ãªãããšããå Žåã«æé©ã§ãã
â äžéå å·¥: ã«ã©ã åã®å€æŽãå倿ãªã©ã次ã®ã¹ãããã®ããã®ãäžæºåãã
â ãã£ã«ã¿ãªã³ã°: ç¡å¹ãªããŒã¿ãåãé€ãåŠçã
â ã³ã¹ãåæž: ã¹ãã¬ãŒãžãžã®æžã蟌ã¿ïŒI/OïŒãçºçããªããããäžèŠãªããŒã¿ä¿åãé¿ããŠãã€ãã©ã€ã³ãé«éåã§ããŸãã
dlt.table ã䜿ãã¹ãæ
ãããŒã¿ãšããŠæ®ããŠããããããŸãã¯ãå€éšããèŠãããå Žåã«å¿
é ã§ãã
â ã¡ããªãªã³ã®åå±€: Bronze, Silver, Gold ã®å段éã
â BIããŒã«ã§ã®å©çš: ããã·ã¥ããŒããªã©ã§å¯èŠåãããæçµããŒã¿ã
â è€éãªåèšç®ã®é²æ¢: éåžžã«éãèšç®ïŒå€§ããªããŒãã«å士ã®JOINãªã©ïŒã®çµæãä¿åããŠãããåŸç¶ã®åŠçã§äœåºŠã䜿ãåãããå Žåã
Databricksã®éçºã奜ããªIDEã§è¡ãããšã¯å¯èœïŒ
å¯èœã
1. ãã³ãŒãã ãéããã¹ã¿ã€ã«ïŒVS Codeæ¡åŒµæ©èœ / DABsïŒ
ããã¯ãç°å¢ã®åçŸãã§ã¯ãªãããç·šéã¯æå
ãå®è¡ã¯ãã£ã¡ããšãã忥å¶ã§ãã
â ä»çµã¿: ããŒã«ã«ã®IDEã§æžãããã¡ã€ã«ããä¿åããç¬éã«Databricksã®ã¯ãŒã¯ã¹ããŒã¹ïŒãªã¢ãŒãïŒãžåæããŸãã
â å®è¡: ãå®è¡ããã¿ã³ãæŒããšãDatabricksäžã®ã¯ã©ã¹ã¿ãŒã«å¯ŸããŠãä»éã£ããã®ãã¡ã€ã«ãå®è¡ããããšããåœä»€ãé£ã³ãŸãã
â ã¡ãªãã: ããŒã«ã«PCã®æ§èœãäœããŠããã¯ã©ãŠãã®åŒ·åãªãã¯ãŒïŒGPUã倧容éã¡ã¢ãªïŒããã®ãŸãŸäœ¿ããŸãã
2. ãè³ã ãããŒã«ã«ãã¹ã¿ã€ã«ïŒDatabricks ConnectïŒ
ããããç°å¢ã®åçŸãã«äžçªè¿ãæèŠãããããŸããã
- ä»çµã¿: ããŒã«ã«ç°å¢ã«
databricks-connectãšããã©ã€ãã©ãªãå ¥ããŸããããã¯Sparkã®ãåžä»€å¡ïŒãã©ã€ããŒïŒãã®åœ¹å²ãããŒã«ã«ã§ä»£è¡ãããã®ã§ãã - åçŸæ§: ããŒã«ã«ã®Pythonç°å¢ïŒã©ã€ãã©ãªã®ããŒãžã§ã³ãªã©ïŒããDatabricksã¯ã©ã¹ã¿ãŒã®ã©ã³ã¿ã€ã ïŒDBRïŒãšå³å¯ã«äžèŽãããããšã§ãæå ã§åãããŠããæèŠã§ãªã¢ãŒãã®ã¯ãŒã«ãŒãæããŸãã
- ã¡ãªãã: IDEã®ãããã¬ãŒïŒã¹ãããå®è¡ïŒãå®ç§ã«æ©èœããŸãã
Databricksãããã¬ãŒã£ãŠå€æ°èŠããïŒããã¯Spark CLIã®æ©èœã§ãæäŸãããŠãïŒ
倿°ã®å€ããªã¢ã«ã¿ã€ã ã§ãèŠããããšãã§ããŸãã
Databricksã®ãããã¬ãŒã¯ãåã«ã³ãŒããæ¢ããã ãã§ãªãããã®æç¹ã§ã®ã¡ã¢ãªäžã®ç¶æ ãå¯èŠåããããšã«åªããŠããŸãããã ããSpark CLIïŒã³ãã³ãã©ã€ã³ïŒãšã¯åœ¹å²ãæ©èœãå šãç°ãªããŸãã
| æ©èœ | Databricks ãããã¬ãŒ | Spark CLI (pyspark / spark-shell) |
|---|---|---|
| ã€ã³ã¿ãŒãã§ãŒã¹ | GUIïŒãã©ãŠã¶ / VS CodeïŒ | ããã¹ãïŒã¿ãŒããã«ïŒ |
| 倿°ã®ç¢ºèªæ¹æ³ | å°çšããã«ã§äžèŠ§è¡šç€º | print() ã dir() ãæã£ãŠæåç¢ºèª |
| ãã¬ãŒã¯ãã€ã³ã | ã¯ãªãã¯äžã€ã§èšå®å¯èœ | pdb.set_trace() ãªã©ã®ã³ãŒãåã蟌ã¿ãå¿
èŠ |
| äž»ãªç®ç | è€éãªããžãã¯ã®ä¿®æ£ã»ãããã° | ç°¡åãªåäœç¢ºèªã»å°èŠæš¡ãªåœä»€å®è¡ |
CPUæéã£ãŠãªã«ãããããžã§ãæéããé·ããšäœãåé¡ãªããšãå€ãïŒ
çµè«ããèšããšã䞊ååŠçãè¡ãDatabricksïŒSparkïŒã§ã¯ãCPUæé ïŒ ãžã§ãæéãã«ãªãã®ã¯æ£åžžãªç¶æ ã§ããããã®ãåçãããäžèº«ããããããå Žåã«åé¡ãçºçããŠããŸãã
1. CPUæéãšãžã§ãæéã®éã
- ãžã§ãæéïŒWall-clock Time / Elapsed TimeïŒ:
ããªããã¹ããããŠã©ããã§æž¬ã£ããå®éã®åŸ ã¡æéãã§ãããžã§ããå§ãŸã£ãŠããçµãããŸã§ã®ã«ã¬ã³ããŒäžã®æéã§ãã - CPUæéïŒCPU TimeïŒ:
ã¯ã©ã¹ã¿ãŒå ã®å šCPUã³ã¢ãåããæéã®åèšã§ãã
[!TIP]
ã人æïŒãã³ã¢ã¯ãŒïŒãã®æŠå¿µãšåãã§ãã
- 10人ãããã§1æéãããäœæ¥ãããå ŽåïŒ
- ãžã§ãæé ïŒ 1æé
- CPUæé ïŒ 10æéïŒ10人 à 1æéïŒ
2. ãCPUæé ïŒ ãžã§ãæéãã¯æ®éïŒ
ã¯ãã䞊ååŠçãããŠãããªã絶察ã«CPUæéã®æ¹ãé·ããªããŸãã
ãã8ã³ã¢ã®ã¯ã©ã¹ã¿ãŒã䜿ã£ãŠããŠãCPUæéããžã§ãæéã®çŽ8åã§ããã°ãå
šã³ã¢ãå¹çãã䜿ãåã£ãŠãããå¥åº·ãªç¶æ
ããšèšããŸãã
Delta sharingã®å ŽåããªãŒãžã§ã³ãç°ãªããšè¿œå æéãããïŒ
ã¯ããè¿œå æéïŒäž»ã«ã¯ã©ãŠãã¹ãã¬ãŒãžã®ããŒã¿è»¢éæïŒãšã°ã¬ã¹æéïŒãçºçããŸãã
ããã¯Databricksã®å©çšæïŒDBUïŒãšããããããããŒã¿ãä¿æããŠããã¯ã©ãŠããããã€ããŒïŒAWS, Azure, GCPïŒåŽããè«æ±ãããè²»çšãäž»ãªèŠå ã§ãã
æå¹ãªãžã§ãå®çŸ©ãããå Žåãresourceé äžã¯yamlïŒjsonïŒ
ãYAMLïŒã€ã ã«ïŒã ã§ãã
rescueãšã¯
loudFiles.schemaEvolutionMode ã®äžã€ããrescueïŒã¬ã¹ãã¥ãŒïŒãã¢ãŒããããã³ããã§äœæããã ãã¬ã¹ãã¥ãŒããŒã¿ã«ã©ã ïŒ_rescued_dataïŒã ã®ããš
ãã¹ããŒãã«åããªãã£ãããäºå®ã«ãªãã£ãããŒã¿ããæšãŠãã«ãå°çšã®é¿é£ã·ã§ã«ã¿ãŒïŒã«ã©ã ïŒã«æŸã蟌ãã§å®ãæ©èœ
Auto Loader ã§ rescue ã¢ãŒããæå¹ã«ãããšãããŒãã«ã« _rescued_data ãšããååã®é ãã«ã©ã ïŒJSON圢åŒïŒãèªåã§äœãããŸãã
Kafkaãèªã¿èŸŒãå Žåã¯readfrom(Kafka)?
df = (spark.readStream
.format("kafka") # 1. 圢åŒãkafkaã«æå®
.option("kafka.bootstrap.servers", "host:port") # 2. æ¥ç¶å
ãµãŒããŒ
.option("subscribe", "topic_name") # 3. 賌èªãããããã¯å
.option("startingOffsets", "earliest") # 4. ã©ãããèªã¿å§ããã
.load())
ç£æ»ãã°ã£ãŠcsv?json?xml?
Databricksã®ç£æ»ãã°ïŒAudit LogsïŒã®æšæºçãªãã©ãŒããã㯠JSON ã§ã
DLTã®ã¡ãªããã®äžã€ã®å®£èšçãã€ãã©ã€ã³ãšã¯ãããã§ãªããã®ãšã®éããå«ããŠè§£èª¬ããŠ
Delta Live TablesïŒDLTïŒã®æå€§ã®ç¹åŸŽã§ããã宣èšçïŒDeclarativeïŒãã€ãã©ã€ã³ããããã¯ãåŸæ¥ã®ããŒã¿ãšã³ãžãã¢ãªã³ã°ã®ææ³ãåçã«å€ããèãæ¹ã§ãã
äžèšã§ãããšãããã©ããã£ãŠïŒHowïŒãåãããã§ã¯ãªãããäœïŒWhatïŒããäœãããããèšè¿°ããã
ã¹ã¿ã€ã«ãæããŸãã
察ç
§çãªæŠå¿µã§ãããåœä»€çïŒImperativeïŒãã€ãã©ã€ã³ããšæ¯èŒããŠè§£èª¬ããŸãã
1. ãåœä»€çã vs ã宣èšçãã®éã
- åœä»€ç: ãèªã¿èŸŒãã§ãå å·¥ããŠãä¿åããããšããæé æžã
- 宣èšç: ããã®ããŒã¿ãããã®ããŒãã«ãäœãããšããå®çŸ©æžã
åœä»€çãã€ãã©ã€ã³ïŒåŸæ¥ã®SparkããŒãããã¯ãªã©ïŒ
ãæé ããäžã€ãã€æç€ºããã¹ã¿ã€ã«ã§ãã
- ç¹åŸŽ: æçã®ã¬ã·ãã§ãããšããŸããæ¹¯ã沞ãããŠã次ã«éººã3åè¹ã§ãŠããã®éã«ã¹ãŒããäœã£ãŠâŠâŠããšæé ã®é çªã𿹿³ããã¹ãŠæžããŸãã
- ã³ãŒãã®äŸ:
- ã¯ã©ã¹ã¿ãŒãèµ·åããã
- S3ãããã¡ã€ã«ãèªã¿èŸŒãã
- ããŒã¿ãå å·¥ããã
- DeltaããŒãã«ã«ä¿åããã
- 倱æããããªãã©ã€ããã³ãŒããæžãã
- 課é¡: äŸåé¢ä¿ïŒAãçµãã£ããBãããïŒããšã©ãŒåŠçããªãœãŒã¹ç®¡çããã¹ãŠèªåã§ããã°ã©ãã³ã°ããªããã°ãªããŸããã
宣èšçãã€ãã©ã€ã³ïŒDLTïŒ
ã宿å³ããå®çŸ©ããã¹ã¿ã€ã«ã§ãã
â ç¹åŸŽ: ã¬ã·ãã§ã¯ãªããçŸå³ããã©ãŒã¡ã³ãé£ã¹ããïŒå®æç¶æ
ïŒããšæ³šæããã ãã§ãããæ¹¯ããã€æ²žããããç«å æžãã©ããããã¯ã·ã¹ãã ïŒDLTïŒã倿ããŸãã
â ã³ãŒãã®äŸ:
â ããœãŒã¹ã¯ãã®ãã©ã«ãã®JSONã
â ãããŒãã«Aã¯ããœãŒã¹ããã£ã«ã¿ãªã³ã°ãããã®ã
â ãããŒãã«Bã¯ãããŒãã«AãJOINãããã®ã
â ã¡ãªãã: å®è¡é åºã®å¶åŸ¡ãã€ã³ãã©ã®ç®¡çãDLTã«äžžæãã§ããŸãã
| é ç® | åœä»€çïŒStandard SparkïŒ | 宣èšçïŒDLTïŒ |
|---|---|---|
| èªã¿èŸŒã¿ | spark.read.format("json").load(...) |
dlt.read_stream("source_name") |
| æžã蟌㿠| df.write.format("delta").saveAsTable(...) |
@dlt.tableïŒãã³ã¬ãŒã¿ã§å®çŸ©ïŒ |
| äŸåé¢ä¿ | ããŒãããã¯ã®ã»ã«é ãå€éšJobããŒã«ã§å¶åŸ¡ | ã·ã¹ãã ãã³ãŒããè§£æããŠèªåå€å¥ |
ä»å䜿ããµã³ãã«ããŒã¿
以äžã®ãããªãECãµã€ãã®çããŒã¿ïŒJSONïŒãæ³å®ããŸãã
{"order_id": 101, "amount": 2500, "status": "shipped"}
{"order_id": 102, "amount": -500, "status": "error"} // â äžæ£ããŒã¿ïŒéé¡ããã€ãã¹ïŒ
{"order_id": 103, "amount": 1200, "status": "shipped"}
ããããããšïŒ
1. Bronze: çããŒã¿ããã®ãŸãŸåã蟌ã
2. Silver: éé¡ã 0 以äžã®æ£åžžãªããŒã¿ã ããæœåºãã
1. åœä»€çãã€ãã©ã€ã³ïŒåŸæ¥ã®ææ³ïŒ
ãã©ããã£ãŠåŠçãããããã¹ãããããšã«èšè¿°ããŸãã
# æé 1: ããŒã¿ãèªã¿èŸŒã
df_raw = spark.read.json("/path/to/raw_sales")
# æé 2: ãã£ã«ã¿ãªã³ã°åŠçïŒããžãã¯ïŒ
df_cleaned = df_raw.filter("amount >= 0")
# æé 3: ä¿åïŒãã¹ããã©ãŒãããããã§ãã¯ãã€ã³ããåå¥ã«æå®ïŒ
df_cleaned.write.format("delta").mode("append").save("/path/to/silver_sales")
# æé 4: (éçšè
ãããããš)
# ãã®ããŒãããã¯ãæ¯æ¥10æã«å®è¡ããããã«ããžã§ãããèšå®ãã
# 倱æããæã®ãªãã©ã€èšå®ããã¯ã©ã¹ã¿ãŒã®ãµã€ãºç®¡çãèªåã§è¡ãã
[!CAUTION]
åé¡ç¹: ãããSilverã®åŸã«GoldããŒãã«ã远å ãããããšæã£ãããã³ãŒããæžãæããã ãã§ãªããå€éšã®ãªãŒã±ã¹ãã¬ãŒã¿ãŒïŒJobããŒã«ãªã©ïŒã§å®è¡é åºãåæ§æããªããã°ãªããŸããã
2. 宣èšçãã€ãã©ã€ã³ïŒDLTã®ææ³ïŒ
ãäœãäœããããïŒå®çŸ©ïŒãã ããèšè¿°ããŸãã
import dlt
# 1. BronzeããŒãã«ãå®çŸ©ïŒAuto Loaderã䜿çšïŒ
@dlt.table
def sales_bronze():
return spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/path/to/raw_sales")
# 2. SilverããŒãã«ãå®çŸ©ïŒå質ãã§ãã¯ã宣èšããã ãïŒïŒ
@dlt.table
@dlt.expect_or_drop("valid_amount", "amount >= 0") # äžæ£ããŒã¿ã¯èœãšãããšå®£èš
def sales_silver():
return dlt.read("sales_bronze") # åã®ããŒãã«ããèªãããšæžãã ãã§äŸåé¢ä¿ãæç«
宣èšçã§ããããšã®ãå®å©ãã¯ããïŒ
ã¡ãªããAïŒäŸåé¢ä¿ã®èªå解決ïŒã次ã¯äœïŒããèããªããŠããïŒ
- åœä»€ç: ãAã®ä¿åãçµãã£ããBãåããããšããé åºã人éã管çããŸãã
- 宣èšç: DLTã®ã³ãŒãå
ã§
dlt.read("sales_bronze")ãšæžããç¬éã«ãã·ã¹ãã ããããå ã«BronzeãæŽæ°ããŠããSilverãåãããªãããªããšèªåã§å€æããæçã«ãŒãã§å®è¡ããŸãã
ã¡ãªããBïŒããŒã¿å質ã®ãèŠããåã
- åœä»€ç: ãã£ã«ã¿ãªã³ã°ã§äœä»¶èœã¡ãããç¥ãã«ã¯ãèªåã§ãã°ãåºåããã³ãŒããæžãå¿ èŠããããŸãã
- 宣èšç:
@dlt.expect...ãšå®£èšããã ãã§ãDatabricksã®ç®¡çç»é¢ã«ãäœä»¶ã®ããŒã¿ãã«ãŒã«éåã§ããããããããããšããããã·ã¥ããŒããèªåçæãããŸãã
ã¡ãªããCïŒã¹ããŒã倿Žããªããã»ã¹ã®å®¹æã
- åœä»€ç: éå»1幎åã®ããŒã¿ãããçŽãããïŒãªããã»ã¹ïŒå Žåãæåã§ãã©ã«ãã空ã«ããŠããã§ãã¯ãã€ã³ãããªã»ããããŠâŠâŠãšããäœæ¥ãå¿ èŠã§ãã
- 宣èšç: UIã® ãFull Refreshã ãã¿ã³ãããããšæŒãã ããã·ã¹ãã ããçŸåšã®å®çŸ©ãã«åãããŠãå šããŒã¿ãå®å šã«äœãçŽããŠãããŸãã
çµè«
宣èšçãªãã€ãã©ã€ã³ã䜿ããšããšã³ãžãã¢ã¯ãããŒã¿ã®é 管工äºïŒã€ã³ãã©ç®¡çãé åºå¶åŸ¡ïŒãããè§£æŸããããããŒã¿ã«ã©ããªããžãã¹ã«ãŒã«ãé©çšãããããšãã䟡å€åµåºã«æéãå²ããããã«ãªããŸãã