本記事では、Google Cloud Storage にアップロードされた動画を、Cloud Run functions と FFmpeg を使用して3分ごとに分割し、別のバケットに保存する手順を紹介します。
処理の流れは以下の通りです。
1. 特定のCloud Storage バケット(入力用バケット)に動画ファイルがアップロードされる
2. アップロードをトリガーとして Cloud Run functions が自動起動する
3. Cloud Run functions が FFmpeg を使用して動画を3分ごとに分割する
4. 分割された各動画ファイルを別の Cloud Storage バケット(出力用バケット)にアップロードする
Cloud Storage バケットの作成
gcloud コマンドを使用して、2つのバケットを作成します。
BUCKET_NAME_INPUT:分割前の動画をアップロードするバケットBUCKET_NAME_OUTPUT:分割後の動画を保存するバケット
PROJECT_ID=$(gcloud config get-value project)
BUCKET_NAME_INPUT="BUCKET_NAME_INPUT"
BUCKET_NAME_OUTPUT="BUCKET_NAME_OUTPUT"
gcloud storage buckets create gs://${BUCKET_NAME_INPUT} \
   --project=${PROJECT_ID}  \
   --location=asia-northeast1
gcloud storage buckets create gs://${BUCKET_NAME_OUTPUT} \
   --project=${PROJECT_ID}  \
   --location=asia-northeast1
APIの有効化
以下のコマンドで必要な API を有効化します。
gcloud services enable \ artifactregistry.googleapis.com \ run.googleapis.com \ cloudbuild.googleapis.com \ storage.googleapis.com \ cloudfunctions.googleapis.com \ eventarc.googleapis.com \ pubsub.googleapis.com
artifactregistry.googleapis.com:関数のコンテナイメージを保存するために使用されます。run.googleapis.com:Cloud Run functions の実行基盤として Cloud Run を利用します。cloudbuild.googleapis.com:ソースコードのビルドとコンテナイメージの作成に使用されます。storage.googleapis.com:Cloud Storage へのアクセスに必要です。cloudfunctions.googleapis.com:Cloud Run functions サービスを利用するために必要です。eventarc.googleapis.com:Cloud Storage イベントをトリガーとして Cloud Run functions を起動するために Eventarc を利用します。pubsub.googleapis.com: Eventarc が Cloud Storage イベントを Pub/Sub 経由で Cloud Run functions に通知するために使用されます。
サービスエージェントへの IAM 権限付与
Cloud Storage サービスが Pub/Sub トピックにイベント通知をパブリッシュできるように、Cloud Storage のサービスエージェントに必要な IAM ロールを付与します。
PROJECT_ID=$(gcloud config get-value project)
SERVICE_AGENT="$(gcloud storage service-agent --project=${PROJECT_ID})"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member="serviceAccount:${SERVICE_AGENT}" \
    --role='roles/pubsub.publisher'
これにより、Cloud Storage バケットで発生したイベントを Pub/Sub トピックに発行できるようになります。Eventarc はこの Pub/Sub トピックを介してイベントを検知します。
サービスアカウントの作成
Cloud Run functions が Cloud Storage にアクセスしたり、Eventarc からイベントを受け取ったりするために、専用のサービスアカウントを作成し、必要な権限を付与します。
PROJECT_ID=$(gcloud config get-value project)
SERVICE_ACCOUNT_NAME="gcs-cloud-run"
gcloud iam service-accounts create ${SERVICE_ACCOUNT_NAME} --project=${PROJECT_ID}
# Cloud Storage のアクセス権限
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
   --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
   --role="roles/storage.objectUser"
# Cloud Run の呼び出し権限
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
   --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
   --role="roles/run.invoker"
# Eventarc トリガーの権限
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
   --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
   --role="roles/eventarc.eventReceiver"
Cloud Run functions のソースコード作成
次に、Cloud Storage バケットから動画をダウンロードし、3分ごとに分割して再度アップロードする関数を作成します。
作業用のディレクトリを作成し、その中に以下の2つのファイルを作成します。
mkdir video-split-function cd video-split-function
requirements.txt
関数が使用する Python ライブラリを記述します。
functions-framework==3.* google-cloud-storage
main.py
動画処理のロジックを記述します。
import os
import tempfile
import subprocess
import functions_framework
from google.cloud import storage
# 環境変数
INPUT_BUCKET = os.environ.get('INPUT_BUCKET')
OUTPUT_BUCKET = os.environ.get('OUTPUT_BUCKET')
SEGMENT_DURATION_SEC = 180
# Cloud Storage クライアントの初期化
storage_client = storage.Client()
@functions_framework.cloud_event
def main(cloud_event):
    """
    Cloud Storage に新しい動画がアップロードされたときに実行される関数
    動画が3分以上の場合、3分ごとに分割して別のバケットに保存する
    """
    # イベントからファイル情報を取得
    data = cloud_event.data
    bucket_name = data["bucket"]
    file_name = data["name"]
    # 動画ファイルのみ処理
    if not file_name.lower().endswith(('.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv')):
        return
    # 一時ディレクトリを作成
    with tempfile.TemporaryDirectory() as temp_dir:
        # 入力ファイルパス
        input_path = os.path.join(temp_dir, file_name)
        # Cloud Storage から動画をダウンロード
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(file_name)
        blob.download_to_filename(input_path)
        # FFprobe を使って動画の長さを取得
        duration = get_video_duration(input_path)
        # 動画が3分未満なら処理せずに終了
        if duration < SEGMENT_DURATION_SEC:
            return
        # 3分ごとに分割
        output_directory = os.path.join(temp_dir, "output")
        os.makedirs(output_directory, exist_ok=True)
        # ファイル名から拡張子を取得
        file_name_without_ext, extension = os.path.splitext(file_name)
        # 出力セグメントのテンプレート
        segment_template = os.path.join(output_directory, f"{file_name_without_ext}_part_%02d{extension}")
        # FFmpeg で動画を分割
        split_video(input_path, segment_template)
        # 分割されたファイルを Cloud Storage にアップロード
        output_bucket = storage_client.bucket(OUTPUT_BUCKET)
        for segment_file in os.listdir(output_directory):
            segment_path = os.path.join(output_directory, segment_file)
            output_blob = output_bucket.blob(segment_file)
            output_blob.upload_from_filename(segment_path)
def get_video_duration(video_path):
    """FFprobe を使用して動画の長さを秒単位で取得"""
    command = [
        'ffprobe',
        '-v', 'error',
        '-show_entries', 'format=duration',
        '-of', 'csv=p=0',
        video_path
    ]
    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    # 戻り値を浮動小数点数に変換
    return float(result.stdout.strip())
def split_video(input_path, segment_template):
    """FFmpeg を使用して動画を3分ごとに分割"""
    command = [
        'ffmpeg',
        '-i', input_path,
        '-c', 'copy',
        '-map', '0',
        '-f', 'segment',
        '-segment_time', '180',
        '-segment_start_number', '1',
        '-reset_timestamps', '1',
        segment_template
    ]
    subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
Cloud Run functions の Python ランタイムには、FFmpeg が標準で含まれています。(参考)
よって、別途FFmpegをデプロイパッケージに含める必要はありません。
FFprobe
get_video_duration 関数内で実行している ffprobe コマンドは以下の通りです。
ffprobe -v error -show_entries format=duration -of csv=p=0 <video_path>
ffprobe:動画の情報を表示・分析します。-v error:ffprobe のログ出力レベルを error に設定し、不要な情報を減らします。-show_entries format=duration:動画ファイル全体の情報の中から、動画の長さだけを取得します。-of csv=p=0:出力形式を CSV 形式に設定し、プレフィックスを表示しないようにします。video_path:動画ファイルのパスが入ります。
FFmpeg
split_video 関数内で実行している ffmpeg コマンドは以下の通りです。
ffmpeg -i <input_path> -c copy -map 0 -f segment -segment_time 180 -segment_start_number 1 -reset_timestamps 1 <segment_template>
ffmpeg:音声・動画の形式変換や編集を行います。-i input_path:入力ファイルを指定します。-c copy:コーデックの指定です。copy を指定すると再エンコードを行わないため、高速処理かつ品質劣化なしで分割できます。-map 0:入力ファイルのすべてのストリーム(映像、音声、字幕など)を出力に含めます-f segment:時間やフレーム数指定でストリームを分割します。-segment_time 180:各セグメントの長さを 180 秒に設定します。-segment_start_number 1:出力ファイル名の連番を 1 から開始します。-reset_timestamps 1:各セグメントのタイムスタンプを 0 から開始するようにリセットします。これを指定しないと先頭に空白が入った動画ファイルが生成されます。segment_template:出力ファイル名のテンプレートを指定します。
デプロイ
Cloud Run functions をデプロイします。
main.py と requirements.txt があるディレクトリで以下のコマンドを実行してください。
PROJECT_ID=$(gcloud config get-value project)
INPUT_BUCKET="INPUT_BUCKET"
OUTPUT_BUCKET="OUTPUT_BUCKET"
SERVICE_ACCOUNT_NAME="gcs-cloud-run"
FUNCTION_NAME="video-split-function"
gcloud functions deploy ${FUNCTION_NAME} \
--gen2 \
--project=${PROJECT_ID} \
--region=asia-northeast1 \
--runtime=python312 \
--memory=512Mi \
--entry-point=main \
--trigger-bucket=${INPUT_BUCKET} \
--service-account=${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
--set-env-vars INPUT_BUCKET=${INPUT_BUCKET},OUTPUT_BUCKET=${OUTPUT_BUCKET}
このコマンドで指定しているオプションは下記です。
--gen2:第 2 世代の Cloud Functions を使用--project:デプロイ先のプロジェクトID--region:デプロイするリージョン--runtime: Python のランタイムバージョン--memory:関数に割り当てるメモリ--entry-point:イベント発生時に最初に呼び出される関数名--trigger-bucket:関数をトリガーする Cloud Storage バケットの名前--set-env-vars:関数内で使用する環境変数
動作確認
入力用のバケットに3分以上の長さの動画ファイル(今回は10分)をアップロードします。

処理が正常に完了すると、3分ごとに分割された動画ファイルが出力用のバケットに保存されます。
もし、動画が3分未満の場合は、出力バケットには何も生成されません。

さいごに
今回は、Google Cloud Storage にアップロードされた動画を、Cloud Run functions と FFmpeg を使って自動的に分割し、別のバケットに保存する方法を紹介しました。
本記事がお役に立てれば幸いです。
最後まで読んでいただきありがとうございました。