やりたいこと
担当案件でRDS(MYSQL)への接続が必要となったが、用途や接続先により接続方法が異なったため、備忘録として残しておく。
前提
- SQL Alchemy や Alembic に関する説明やコマンド等は割愛
- 前者がORM、後者はマイグレーションツール
- プロジェクトにはAlembicで使用可能な Model は用意されているものとする
環境
- Python: 3.11
- RDS: Aurora MySQL 3.03.1 (compatible with MySQL 8.0.26)
- SQL Alchemy: 2.0.16
- ALembic: 1.11.1
- sshtunnel: 0.4.0
ケース1: ローカル環境からの接続
開発環境としてローカルからAWSのRDSへアクセスする。
APIの実行およびDBのマイグレーションのいずれも行う。
RDSはpublicからのアクセスを許容している状態とする。
↓ 接続用コード
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
dialect = "mysql"
driver = "pymysql"
username = DBのユーザー名
password = DBのパスワード
host = DBのエンドポイント
port = DBのポート番号
database = データベース名
charset_type = "utf8mb4"
# basic認証
def get_connection():
    db_url = f"{dialect}+{driver}://{username}:{password}@{host}:{port}/{database}?charset={charset_type}"
    engine = create_engine(db_url, echo=True, pool_pre_ping=True)
    self.session_local = scoped_session(
        sessionmaker(autocommit=False, autoflush=False, bind=engine)
    )
     return (declarative_base(), None)
def get_session(self):
        return session_local()
↓ Alembic.ini
sqlalchemy.url = mysql+pymysql://DBのユーザー名:DBのパスワード@DBのエンドポイント:DBのポート番号/データベース名?charset=utf8mb4
ここは特に難しいこともなく一般的な接続でOK.
ケース2: AWS(Lambda)からの接続
AWS 上にデプロイされた Lambda から RDS へのアクセス方法。
認証情報は SecretsManager に保存しているため、ソース上からは取り除いている。
このケースはAPIの実行による接続のみで、マイグレーションは行わいので、Alembic.iniは不要。
↓ 接続用コード
import os
import json
import boto3
from app.core.config import get_app_settings
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import event
from typing import Dict
from botocore.exceptions import ClientError
# AWS認証
def get_connection():
    secret_name = Secrets Managerのキー名
    region_name = AWSのリージョン
    session = boto3.session.Session()
    client = session.client(service_name="secretsmanager", region_name=region_name)
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    except ClientError as e:
        raise e
    secret = get_secret_value_response["SecretString"]
    values: Dict = json.loads(secret)
    # 認証情報は Secrets Manager から取得する
    dialect = "mysql"
    driver = "pymysql"
    username = values[Secrets Managerから取得した設定一覧にあるDBのユーザー名]
    host = values[Secrets Managerから取得した設定一覧にあるDBのエンドポイント]
    port = values[Secrets Managerから取得した設定一覧にあるDBのポート番号]
    database = DBのデータベース名
    charset_type = "utf8mb4"
    db_url = f"{dialect}+{driver}://{username}@{host}:{port}/{database}?charset={charset_type}"
    engine = create_engine(db_url, echo=True, pool_pre_ping=True)
    # TLS用にpemファイルを認証に使用
    @event.listens_for(engine, "do_connect")
    def provide_token(dialect, conn_rec, cargs, cparams):
        rds_client = boto3.client("rds")
        cparams["ssl"] = {"ca": os.getcwd() + "/ssh/AmazonRootCA1.pem"}
        cparams["password"] = rds_client.generate_db_auth_token(
            DBHostname=host, Port=port, DBUsername=username, Region=region_name
        )
    session_local = scoped_session(
        sessionmaker(autocommit=False, autoflush=False, bind=engine)
    )
     return (declarative_base(), None)
def get_session(self):
        return session_local()
ポイントは def provide_token() のあたり。
LambdaからRDSへのアクセスはSSL通信を行う必要がある。
provide_token を追加することで、証明書の検証フローが追加される。
pemファイルは下記のURLからダウンロード可能なので、プロジェクトに組み込んでおくとよい。
↓ 参考
https://stackoverflow.com/questions/69447368/connecting-to-mysql-using-a-token
https://docs.aws.amazon.com/ja_jp/dms/latest/userguide/CHAP_Security.SSL.html
↓ 中間 CA 証明書
https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
ケース3: EC2を踏み台とした接続
実環境のRDSがAWSのPrivate Subnetにいるため、外部からは直接アクセス出来ないのはよくある話。
これではローカルからAlembicを使ったマイグレーションが出来ず、DBの構築・設定変更が出来ない。
回避策として EC2 を立ち上げ、踏み台サーバーとして利用する。
各種DBのクライアントツール(MYSQL Workbenchとか)にもある設定。
これをソースコード上でやってみる。
RDSはEC2からのアクセスをセキュリティグループのインバウンドにて許容する。
外部からはSSH経由でEC2にアクセスし、EC2を踏み台としてRDSを操作する。
(SSHはMUSTではないけど、ついでに)

↓ 接続用コード
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
from sshtunnel import SSHTunnelForwarder
SSH_PKEY = EC2にSSH接続する際の秘密鍵
SSH_ADDRESS = EC2のIPアドレス
SSH_PORT = 22
SSH_USERNAME = EC2のユーザー名
SSH_REMOTE_BIND_ADDRESS = DBのエンドポイント
LOCAL_HOST = "127.0.0.1"
LOCAL_PORT = ローカルPC側のポート(1~65535で任意。well knownポートがあるので実質1024~)
DB_USERNAME = DBのユーザー名
DB_PASSWORD = DBのパスワード
DB_PORT = DBのポート番号
DATABASE_NAME = データベース名
##############################################################
# 実行前にEC2に登録した鍵(id_rsa) を sshフォルダに置く
##############################################################
# SSH Tunnel認証
def get_connection():
    server = SSHTunnelForwarder(
        (SSH_ADDRESS, SSH_PORT),
        ssh_username=SSH_USERNAME,
        ssh_pkey=SSH_PKEY,
        local_bind_address=(LOCAL_HOST, LOCAL_PORT),
        remote_bind_address=(SSH_REMOTE_BIND_ADDRESS, DB_PORT),
        allow_agent=False,
    )
    server.start()
    db_url = f"mysql+pymysql://{DB_USERNAME}:{DB_PASSWORD}@{LOCAL_HOST}:{LOCAL_PORT}/{DATABASE_NAME}"
    engine = create_engine(db_url, echo=True)
    session_local = scoped_session(
        sessionmaker(autocommit=False, autoflush=False, bind=engine)
    )
    return (declarative_base(), server)
def get_session(self):
        return session_local()
↓ Alembic.ini
sqlalchemy.url = mysql+pymysql://DB_USERNAME:DB_PASSWORD@127.0.0.1:LOCAL_PORT/DATABASE_NAME?charset=utf8mb4
create_engine() の前に踏み台サーバーアクセスのためのひと手間加えるだけでよい。
SQLAlchemy のみであれば LOCAL_PORT は指定せず、 SSHTunnelForwarder に任せることも可能だが、
Alembicで指定する必要があるため、固定値にする必要がある。
あとは EC2 / RDS の設定値をのそのままセットすればよい。
EC2へはSSHでアクセスするため、事前に鍵ペアの生成と公開鍵のEC2への登録が必要となる。
SSH_PKEY は秘密鍵(id_rsa)のパスを指定する。
今回の案件では鍵の有効期間が1分なので、鍵の登録~実行がわちゃわちゃする。
(ここを自動化しておけばよかった)
おまけ: ケースごとの切り替え方法
ローカルか実環境か、などを実行時に意識しなくてよいよう、環境設定に連動するようにする。
最近すっかり聞かなくなったデザインパターンからFactory Methodをちょっとだけ流用。
抽象クラスを用意して、上記の3パターンでそれぞれ抽象クラスを継承したクラスを実装。
DBを操作したいレイヤー用に下記クラスを用意し、外側からは get_db() だけ呼べばあとはよろしくやってくれるようにしておく。
DB = None
SessionLocal = None
Server = None
# DB種別取得
TYPE = 環境設定.db_type
# 種別ごとにインスタンスを切り替える
if TYPE == ケース1(ローカル実行):
    DB = DbAws()
elif TYPE == ケース2(AWS実行):
    DB = DbBasic()
elif TYPE == ケース3(踏み台アクセス):
    DB = DbSshTunnel()
else:
    raise Exception
Base, Server = DB.get_connection()
SessionLocal = DB.get_session()
def get_db():
    try:
        yield SessionLocal
    finally:
        SessionLocal.close()
        if Server is not None:
            Server.close()
注意点としては、sshtunnelは2023/12時点では python 3.11 に対応していないので、コメントアウトしておく必要がある。。。
最後に
用途や接続先によってやり方が微妙に異なるのはよくある話。
こういうのを全て網羅したpip のモジュールがあってもよさそうだけど、ない。
RDSにアクセスする以上ついて回る話なので、今後に活かしていく。
 
     
    ![Oracle のロックされているテーブルのセッションを知りたい [cloudpack OSAKA blog]](https://iret.media/wp-content/uploads/2015/07/eyecatch-aws-2-220x116.png)