はじめに

以下の記事では Well-Architected IaC Analyzer というツールを取り上げ、デプロイ時の注意点や実際使ってみての所感などを紹介しました。

今回はこのツールを魔改造してさらに使いやすくしました。プルリクエストを出したり独自機能を盛り込んだりと、学びが多かったので改めて紹介したいと思います。

課題

このツール、社内利用であれば十分に活用できるのですが、以下の点で痒いところに手が届きませんでした。

  • ワークロード ID のバリデーションがなく、処理が進んでからエラーになり体験があまりよくない
  • パブリックにデプロイすると認証なしで HTTP 80 を 0.0.0.0/0 で公開してしまう
  • 別アカウントのワークロードに対して解析できない

そこで、修正できそうな内容であれば自前で修正し、せっかくの機会なのでプルリクエストを出してみることにしました。

ワークロード ID のバリデーション

最初にこんな PR を出しました。Streamlit のコードに以下のようなヘルパー関数を追加しています。

ワークロード ID の形式を確認

ワークロード ID が 32 桁の 16 進数かどうかをチェックします。

# Helper function to check if the given ID is a 32 hex digits
def is_valid_workload_id_format(workload_id_input):
    pattern = re.compile(r"^[a-fA-F0-9]{32}$")
    if pattern.match(workload_id_input):
        return True
    else:
        st.error("Invalid Workload ID, must be 32 hex digits.")
        return False

ワークロードにアクセスできるか確認

GetWorkload API でワークロードにアクセスできるかチェックします。

# Helper function to check if the workload with the specified ID exists
def is_exists_workload_id(workload_id_input):
    try:
        response = wa_client.get_workload(WorkloadId=workload_id_input)
        if response:
            return True
    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message = e.response["Error"]["Message"]
        st.error(f"AWS Error: {error_code} - {error_message}")
        if error_code == "ValidationException":
            st.error("Please check if the WorkloadId is correct.")
        elif error_code == "ResourceNotFoundException":
            st.error("The specified workload was not found.")
        elif error_code == "AccessDeniedException":
            st.error(
                "You don't have permission to perform this operation. Check your IAM policies."
            )
        else:
            st.error("Please check your AWS credentials and permissions.")
    except Exception as e:
        st.error(f"Unexpected error: {str(e)}")
    return False

[Review Uploaded Document] ボタンのクリック時、これらの関数が最初に呼ばれるようにします。

...
with st.spinner("Validating Workload ID..."):
    if not is_valid_workload_id_format(workload_id_input):
        return None
    if not is_exists_workload_id(workload_id_input):
        return None
...

IP アドレスによるアクセス制限

CDK のコードに対しても PR を出しました。config.iniallowed_ips というキーを追加し、設定された値以外の IP アドレスからのアクセスを拒否するようにします。

設定ファイルのパース

config.ini をパースするコードに IP アドレスの取得ロジックを追加します。

     # Read config.ini
     config = configparser.ConfigParser()
     config.read("config.ini")
     model_id = config["settings"]["model_id"]
     if config["settings"]["public_load_balancer"] == "False":
         public_lb = False
     else:
         public_lb = True
+    allowed_ips = config["settings"]["allowed_ips"]
+    if allowed_ips:
+        allowed_ips = allowed_ips.split(",")
+    else:
+        allowed_ips = ["0.0.0.0/0"]

Streamlit をホストする ECS はもっとも抽象的な L3 コンストラクトで定義されており、public_load_balancer プロパティに真偽値を渡すだけで internal あるいは internet-facing いずれかの ALB が自動で作成されます。True を渡すと HTTP 80 が 0.0.0.0/0 で解放されてしまうため、この挙動をなんとかします。

# Create Fargate Service with private ALB
fargate_service = ecs_patterns.ApplicationLoadBalancedFargateService(
    self,
    "StreamlitAppService",
    cluster=ecs_cluster,
    runtime_platform=ecs.RuntimePlatform(
        operating_system_family=ecs.OperatingSystemFamily.LINUX,
        cpu_architecture=architecture,
    ),
    task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions(
        image=ecs.ContainerImage.from_docker_image_asset(imageAsset),
        container_port=8501,
        task_role=app_execute_role,
        environment={
            "IAC_TEMPLATE_S3_BUCKET": UPLOAD_BUCKET_NAME,
            "WA_DOCS_S3_BUCKET": WA_DOCS_BUCKET_NAME,
            "KNOWLEDGE_BASE_ID": KB_ID,
            "MODEL_ID": model_id,
        },
    ),
    task_subnets=ec2.SubnetSelection(
        subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
    ),
    public_load_balancer=public_lb, # <= これ
)

public_load_balancer を使わずに、自前で定義した ALB を load_balancer プロパティで明示的に渡してやります。まずセキュリティグループと ALB を作り、

# Create Security Group for ALB
ecs_security_group = ec2.SecurityGroup(
    self,
    "StreamlitAppSecurityGroup",
    vpc=vpc,
    description="Allow traffic to Fargate service from allowed ips",
    allow_all_outbound=False,
)

# Create ALB
ecs_loadbalancer = elasticloadbalancingv2.ApplicationLoadBalancer(
    self,
    "StreamlitAppLoadBalancer",
    security_group=ecs_security_group,
    vpc=vpc,
    internet_facing=public_lb,
)

その ALB を ECS に渡します。

# Create Fargate Service with private ALB
fargate_service = ecs_patterns.ApplicationLoadBalancedFargateService(
    self,
    "StreamlitAppService",
    ...
    load_balancer=ecs_loadbalancer, # <= これ
)

そして渡した ALB のセキュリティグループに対して、エスケープハッチでインバウンドのエントリを書き換えます。

# Escape hatch for allowed ip addresses
if allowed_ips != ["0.0.0.0/0"]:
    for i, allowed_ip in enumerate(allowed_ips):
        ecs_security_group.node.default_child.add_override(
            f"Properties.SecurityGroupIngress.{i}.CidrIp", allowed_ip
        )
        ecs_security_group.node.default_child.add_override(
            f"Properties.SecurityGroupIngress.{i}.Description",
            "Allow HTTP traffic from specified ip",
        )
        ecs_security_group.node.default_child.add_override(
            f"Properties.SecurityGroupIngress.{i}.FromPort",
            "80",
        )
        ecs_security_group.node.default_child.add_override(
            f"Properties.SecurityGroupIngress.{i}.IpProtocol",
            "tcp",
        )
        ecs_security_group.node.default_child.add_override(
            f"Properties.SecurityGroupIngress.{i}.ToPort",
            "80",
        )

これで、CDK のデプロイ時に設定ファイルに基づいて IP 制限が適用されるようになりました。

クロスアカウント対応

PR を出してみても、すぐにレスポンスをいただけるとは限りません。今回もリポジトリがあまり活発でなくレスポンスがない状態です。とはいえ社内ではより便利に使いたいので、フォーク先のコードを更新してこまめにデプロイしていこうと考えました。

冒頭で取り上げた課題では、クロスアカウント対応がもっとも利便性に寄与します。別アカウントのワークロードを直接更新できるようになれば、さらに便利なツールに進化しそうです。とうわけで、クロスアカウントのサポートについても実装してみました。

変更内容

何をどう変更するかを整理します。

  • ワークロード ID ではなく ARN を受け取るように入力フォームを修正
  • ワークロード ARN をパースして ID とリージョン名を抽出する関数を追加
  • ロール ARN を受け取るための入力フォームを追加
  • ロール ARN の形式をチェックするための関数を追加
  • ロールを引き受けてセッションを作る関数を追加
  • ワークロード ID と Well-Architected のクライアントを Streamlit の session_state で管理

フォーマッターによる書式変更や変数名の変更も入っているため少し煩雑ですが、こちらが該当のコミットです。

ワークロード ID ではなく ARN を受け取るようにする

リージョン名が必要なため、ID ではなく ARN を受け取るようにします。そして入力値からワークロード ID とリージョン名を抽出します。

# Helper function to check if the given workload ARN is valid and if so, get the ID and region
def extract_id_and_region_from_arn(workload_arn_input):
    pattern = re.compile(
        r"^arn:aws:wellarchitected:([a-z0-9-]+):\d{12}:workload/([a-fA-F0-9]{32})$"
    )
    match = pattern.match(workload_arn_input)
    if match:
        workload_id = match.group(2)
        region = match.group(1)
        return workload_id, region
    else:
        st.error("Invalid workload ARN format.")
        return None, None

ロール ARN を入力から受け取るためのフォームを追加します。これはワークロードがツールと同じアカウントに存在する場合は不要です。

role_arn_input = st.text_input(
    "If you are handling workloads for another account, enter the IAM role ARN to assume that role. (Optional)",
    "",
)

受け取った ARN の形式を以下の関数でチェックします。

# Helper function to check if the given role ARN is valid
def is_valid_role_arn(role_arn_input):
    pattern = re.compile(r"^arn:aws:iam::\d{12}:role/[a-zA-Z_0-9+=,.@-]+$")
    if pattern.match(role_arn_input):
        return True
    else:
        st.error("Invalid role ARN format.")
        return False

Assume Role が肝となります。以下のような関数でセッションを取得します。

# Create a session with the assumed role credentials
def get_session(role_arn_input):
    try:
        assumed_role = sts_client.assume_role(
            RoleArn=role_arn_input, RoleSessionName="assume_role_session"
        )
        credentials = assumed_role["Credentials"]
        session = boto3.Session(
            aws_access_key_id=credentials["AccessKeyId"],
            aws_secret_access_key=credentials["SecretAccessKey"],
            aws_session_token=credentials["SessionToken"],
        )
        return session
    except Exception as e:
        st.error(f"Error: {str(e)}")
    return None

Well-Architected のクライアントとワークロード ID は UI 上のどの機能においても一貫した値を使わなければなりません。そのための箱として、以下のように session_state を初期化します。wa_client はコードの冒頭でグローバルに宣言しているため、global キーワードを使って再代入します。

# Main App
def main():
    st.title(":orange[Are you Well-Architected?]")

    ...

    # Initialize session state variables
    global wa_client
    if "wa_client" not in st.session_state:
        st.session_state.wa_client = wa_client
    if "workload_id" not in st.session_state:
        st.session_state.workload_id = ""
    ...

これら一連の処理を [Review Uploaded Document] ボタンをクリックした際に実行されるようにします。

with st.spinner("Validating your input..."):
    workload_arn_input = workload_arn_input.strip()
    workload_id, region = extract_id_and_region_from_arn(workload_arn_input)
    if workload_id is None or region is None:
        return None
    st.session_state.workload_id = workload_id

    if role_arn_input and role_arn_input != "":
        role_arn_input = role_arn_input.strip()
        if is_valid_role_arn(role_arn_input):
            session = get_session(role_arn_input)
            if not session:
                return None
            wa_client = session.client(
                "wellarchitected", region_name=region
            )
            st.session_state.wa_client = wa_client
        else:
            return None

    if not is_exists_workload(workload_id):
        return None

以下のような画面になります。ロール ARN を入力するフォームが追加されていることがわかります。

ロールに必要な権限

信頼ポリシーは以下のように通常の Assume Role のものです。今回は外部 ID の実装まではしていないためご注意ください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<account-id>:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}

許可ポリシーは以下の通りです。実際は Resource も定義してよりセキュアにすべきです。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "wellarchitected:ListAnswers",
                "wellarchitected:GetLensReview",
                "wellarchitected:GetLensReviewReport",
                "wellarchitected:GetWorkload",
                "wellarchitected:CreateMilestone",
                "wellarchitected:UpdateAnswer"
            ],
            "Resource": "*"
        }
    ]
}

注意点

[Review Uploaded Document] ボタンと [Complete Well-Architected Review] ボタンはセッション内でのクリック回数がカウントされており、2 回目以降はキャッシュを再利用します。これは元々の挙動です。

1 回目実行時とは異なるワークロード ARN やロール ARN で再実行すると、入力値に不整合が発生して ResourceNotFound が発生する場合があります。これについては一度ブラウザをリロードしてから実行することで正しい結果を取得できます。

おわりに

プルリクエストを出してみたり、フォークして魔改造してみたり、自分の中で OSS に対する最初の壁が取っ払われたのを感じます。みなさんも自身が使っている OSS になんらかの貢献をしてみてはいかがでしょうか。ドキュメント修正や typo 修正のような小さい内容でもメンテナーはきっと嬉しいはずです。