AWS GlueのETLスクリプトを作成する言語として、新たにScalaが追加されました。
画面を確認すると以下のようにPythonに加えてScalaも選択できるようになっています。
以下はScalaで自動生成されたETLスクリプトになります。
import com.amazonaws.services.glue.ChoiceOption import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.ResolveSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // @type: DataSource // @args: [database = "sampledb", table_name = "vpc_flow_logs", transformation_ctx = "datasource0"] // @return: datasource0 // @inputs: [] val datasource0 = glueContext.getCatalogSource(database = "sampledb", tableName = "vpc_flow_logs", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame() // @type: ApplyMapping // @args: [mapping = [("version", "int", "version", "int"), ("account", "string", "account", "string"), ("interfaceid", "string", "interfaceid", "string"), ("sourceaddress", "string", "sourceaddress", "string"), ("destinationaddress", "string", "destinationaddress", "string"), ("sourceport", "int", "sourceport", "int"), ("destinationport", "int", "destinationport", "int"), ("protocol", "int", "protocol", "int"), ("packets", "int", "packets", "int"), ("bytes", "int", "bytes", "int"), ("starttime", "int", "starttime", "int"), ("endtime", "int", "endtime", "int"), ("action", "string", "action", "string"), ("logstatus", "string", "logstatus", "string")], transformation_ctx = "applymapping1"] // @return: applymapping1 // @inputs: [frame = datasource0] val applymapping1 = datasource0.applyMapping(mappings = Seq(("version", "int", "version", "int"), ("account", "string", "account", "string"), ("interfaceid", "string", "interfaceid", "string"), ("sourceaddress", "string", "sourceaddress", "string"), ("destinationaddress", "string", "destinationaddress", "string"), ("sourceport", "int", "sourceport", "int"), ("destinationport", "int", "destinationport", "int"), ("protocol", "int", "protocol", "int"), ("packets", "int", "packets", "int"), ("bytes", "int", "bytes", "int"), ("starttime", "int", "starttime", "int"), ("endtime", "int", "endtime", "int"), ("action", "string", "action", "string"), ("logstatus", "string", "logstatus", "string")), caseSensitive = false, transformationContext = "applymapping1") // @type: ResolveChoice // @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] // @return: resolvechoice2 // @inputs: [frame = applymapping1] val resolvechoice2 = applymapping1.resolveChoice(choiceOption = Some(ChoiceOption("make_struct")), transformationContext = "resolvechoice2") // @type: DropNullFields // @args: [transformation_ctx = "dropnullfields3"] // @return: dropnullfields3 // @inputs: [frame = resolvechoice2] val dropnullfields3 = resolvechoice2.dropNulls(transformationContext = "dropnullfields3") // @type: DataSink // @args: [connection_type = "s3", connection_options = {"path": "s3://glue-test-out-bucket"}, format = "parquet", transformation_ctx = "datasink4"] // @return: datasink4 // @inputs: [frame = dropnullfields3] val datasink4 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://glue-test-out-bucket"}"""), transformationContext = "datasink4", format = "parquet").writeDynamicFrame(dropnullfields3) Job.commit() } }
Sparkの利用者はPythonかScalaのどちらの言語の利用が多いんでしょうか?
いずれにせよ言語の選択肢が増えるのはすごくいいですね。
参考
AWS Glue Now Supports Scala in Addition to Python