これは何

Spark を使用してサンプルクラスターを起動し、Amazon S3 バケットに格納する単純な PySpark スクリプトを実行していきます。

S3作成

適当にS3を作成します。

フォルダを作成する

以下のフォルダを作成します。

output:ログと進行中のアウトプットを入れます。
script:PySparkスクリプト、csvファイルを入れます。

先ほど作成したオブジェクトへ行き、上記フォルダを作成します。

scriptファイルを置く

import argparse

from pyspark.sql import SparkSession

def calculate_red_violations(data_source, output_uri):
    """
    Processes sample food establishment inspection data and queries the data to find the top 10 establishments
    with the most Red violations from 2006 to 2020.

    :param data_source: The URI where the food establishment data CSV is saved, typically
              an Amazon S3 bucket, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
    :param output_uri: The URI where the output is written, typically an Amazon S3
                       bucket, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
    """
    with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
        # Load the restaurant violation CSV data
        if data_source is not None:
            restaurants_df = spark.read.option("header", "true").csv(data_source)

        # Create an in-memory DataFrame to query
        restaurants_df.createOrReplaceTempView("restaurant_violations")

        # Create a DataFrame of the top 10 restaurants with the most Red violations
        top_red_violation_restaurants = spark.sql("SELECT name, count(*) AS total_red_violations " +
          "FROM restaurant_violations " +
          "WHERE violation_type = 'RED' " +
          "GROUP BY name " +
          "ORDER BY total_red_violations DESC LIMIT 10 ")

        # Write the results to the specified output URI
        top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_source', help="The URI where the CSV restaurant data is saved, typically an S3 bucket.")
    parser.add_argument(
        '--output_uri', help="The URI where output is saved, typically an S3 bucket.")
    args = parser.parse_args()

    calculate_red_violations(args.data_source, args.output_uri)

このpyファイルを、S3コンソールで、scriptフォルダにアップロードします。

また、food_establishment_data.zipファイルをダウンロードします。

コンテンツを解凍し、food_establishment_data.csv としてローカルに保存します。
これも同様に、scriptフォルダにアップロードします。

EMR作成

デフォルトで3つのノード(1つのマスターノードと2つのコアノード)を設定されます。
実行時に、マスターノードはデータセットの処理を処理するコアノードにワークロードを分散します。

EMR クラスターを調整

[Steps (ステップ)]、[Add step (ステップの追加)] の順に選択します。

アプリケーションの場所:S3に入れたPySparkスクリプトの場所
引数:
--data_source s3://作成したバケット名/script/food_establishment_data.csv
--output_uri s3://作成したバケット名/output

結果の表示

ステップが正常に実行されると、ステップの送信時に指定した Amazon S3 出力フォルダに出力結果を表示できます。

次の項目が出力フォルダにあることを確認します。

_SUCCESS
ステップの成功を示します。
CSVファイルpart-***
結果を持つオブジェクトです。

なんと、pyファイルは、出力ファイルに、最も赤い違反が多い食品施設の上位 10 件がリストするものでした(!?)
面白いです。

EMR リソースのクリーンアップ

EMRを終了するを押下します。
5 ~ 10 分かかる場合があります。

S3 リソースを削除します。
バケットを削除する前に、クラスターを完全にシャットダウンする必要があります。そうしないと、バケットを空にしようとすると問題が発生する可能性があります。

総括

最初の Amazon EMR クラスターを最初から最後まで起動し、ビッグデータアプリケーションの準備と送信、結果の表示、クラスターのシャットダウンなど、EMR 諸々を実行しました。

なんとなくイメージを掴むにはハンズオンが一番ですね。

今回はAWSから提供されているファイルを使用しましたが、COVID-19 Dataset by Our World in DataというGit Hubから、データを使って分析してみるのも面白いかもしれません。

参考
https://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/emr-gs.html
https://github.com/owid/covid-19-data

元記事はこちら

https://qiita.com/namely_/items/b9027dcd72710905bfc9