やりたいこと
担当案件でRDS(MYSQL)への接続が必要となったが、用途や接続先により接続方法が異なったため、備忘録として残しておく。
前提
- SQL Alchemy や Alembic に関する説明やコマンド等は割愛
- 前者がORM、後者はマイグレーションツール
- プロジェクトにはAlembicで使用可能な Model は用意されているものとする
環境
- Python: 3.11
- RDS: Aurora MySQL 3.03.1 (compatible with MySQL 8.0.26)
- SQL Alchemy: 2.0.16
- ALembic: 1.11.1
- sshtunnel: 0.4.0
ケース1: ローカル環境からの接続
開発環境としてローカルからAWSのRDSへアクセスする。
APIの実行およびDBのマイグレーションのいずれも行う。
RDSはpublicからのアクセスを許容している状態とする。
↓ 接続用コード
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.ext.declarative import declarative_base dialect = "mysql" driver = "pymysql" username = DBのユーザー名 password = DBのパスワード host = DBのエンドポイント port = DBのポート番号 database = データベース名 charset_type = "utf8mb4" # basic認証 def get_connection(): db_url = f"{dialect}+{driver}://{username}:{password}@{host}:{port}/{database}?charset={charset_type}" engine = create_engine(db_url, echo=True, pool_pre_ping=True) self.session_local = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=engine) ) return (declarative_base(), None) def get_session(self): return session_local()
↓ Alembic.ini
sqlalchemy.url = mysql+pymysql://DBのユーザー名:DBのパスワード@DBのエンドポイント:DBのポート番号/データベース名?charset=utf8mb4
ここは特に難しいこともなく一般的な接続でOK.
ケース2: AWS(Lambda)からの接続
AWS 上にデプロイされた Lambda から RDS へのアクセス方法。
認証情報は SecretsManager に保存しているため、ソース上からは取り除いている。
このケースはAPIの実行による接続のみで、マイグレーションは行わいので、Alembic.iniは不要。
↓ 接続用コード
import os import json import boto3 from app.core.config import get_app_settings from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import event from typing import Dict from botocore.exceptions import ClientError # AWS認証 def get_connection(): secret_name = Secrets Managerのキー名 region_name = AWSのリージョン session = boto3.session.Session() client = session.client(service_name="secretsmanager", region_name=region_name) try: get_secret_value_response = client.get_secret_value(SecretId=secret_name) except ClientError as e: raise e secret = get_secret_value_response["SecretString"] values: Dict = json.loads(secret) # 認証情報は Secrets Manager から取得する dialect = "mysql" driver = "pymysql" username = values[Secrets Managerから取得した設定一覧にあるDBのユーザー名] host = values[Secrets Managerから取得した設定一覧にあるDBのエンドポイント] port = values[Secrets Managerから取得した設定一覧にあるDBのポート番号] database = DBのデータベース名 charset_type = "utf8mb4" db_url = f"{dialect}+{driver}://{username}@{host}:{port}/{database}?charset={charset_type}" engine = create_engine(db_url, echo=True, pool_pre_ping=True) # TLS用にpemファイルを認証に使用 @event.listens_for(engine, "do_connect") def provide_token(dialect, conn_rec, cargs, cparams): rds_client = boto3.client("rds") cparams["ssl"] = {"ca": os.getcwd() + "/ssh/AmazonRootCA1.pem"} cparams["password"] = rds_client.generate_db_auth_token( DBHostname=host, Port=port, DBUsername=username, Region=region_name ) session_local = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=engine) ) return (declarative_base(), None) def get_session(self): return session_local()
ポイントは def provide_token() のあたり。
LambdaからRDSへのアクセスはSSL通信を行う必要がある。
provide_token を追加することで、証明書の検証フローが追加される。
pemファイルは下記のURLからダウンロード可能なので、プロジェクトに組み込んでおくとよい。
↓ 参考
https://stackoverflow.com/questions/69447368/connecting-to-mysql-using-a-token
https://docs.aws.amazon.com/ja_jp/dms/latest/userguide/CHAP_Security.SSL.html
↓ 中間 CA 証明書
https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
ケース3: EC2を踏み台とした接続
実環境のRDSがAWSのPrivate Subnetにいるため、外部からは直接アクセス出来ないのはよくある話。
これではローカルからAlembicを使ったマイグレーションが出来ず、DBの構築・設定変更が出来ない。
回避策として EC2 を立ち上げ、踏み台サーバーとして利用する。
各種DBのクライアントツール(MYSQL Workbenchとか)にもある設定。
これをソースコード上でやってみる。
RDSはEC2からのアクセスをセキュリティグループのインバウンドにて許容する。
外部からはSSH経由でEC2にアクセスし、EC2を踏み台としてRDSを操作する。
(SSHはMUSTではないけど、ついでに)
↓ 接続用コード
import os from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.ext.declarative import declarative_base from sshtunnel import SSHTunnelForwarder SSH_PKEY = EC2にSSH接続する際の秘密鍵 SSH_ADDRESS = EC2のIPアドレス SSH_PORT = 22 SSH_USERNAME = EC2のユーザー名 SSH_REMOTE_BIND_ADDRESS = DBのエンドポイント LOCAL_HOST = "127.0.0.1" LOCAL_PORT = ローカルPC側のポート(1~65535で任意。well knownポートがあるので実質1024~) DB_USERNAME = DBのユーザー名 DB_PASSWORD = DBのパスワード DB_PORT = DBのポート番号 DATABASE_NAME = データベース名 ############################################################## # 実行前にEC2に登録した鍵(id_rsa) を sshフォルダに置く ############################################################## # SSH Tunnel認証 def get_connection(): server = SSHTunnelForwarder( (SSH_ADDRESS, SSH_PORT), ssh_username=SSH_USERNAME, ssh_pkey=SSH_PKEY, local_bind_address=(LOCAL_HOST, LOCAL_PORT), remote_bind_address=(SSH_REMOTE_BIND_ADDRESS, DB_PORT), allow_agent=False, ) server.start() db_url = f"mysql+pymysql://{DB_USERNAME}:{DB_PASSWORD}@{LOCAL_HOST}:{LOCAL_PORT}/{DATABASE_NAME}" engine = create_engine(db_url, echo=True) session_local = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=engine) ) return (declarative_base(), server) def get_session(self): return session_local()
↓ Alembic.ini
sqlalchemy.url = mysql+pymysql://DB_USERNAME:DB_PASSWORD@127.0.0.1:LOCAL_PORT/DATABASE_NAME?charset=utf8mb4
create_engine() の前に踏み台サーバーアクセスのためのひと手間加えるだけでよい。
SQLAlchemy のみであれば LOCAL_PORT は指定せず、 SSHTunnelForwarder に任せることも可能だが、
Alembicで指定する必要があるため、固定値にする必要がある。
あとは EC2 / RDS の設定値をのそのままセットすればよい。
EC2へはSSHでアクセスするため、事前に鍵ペアの生成と公開鍵のEC2への登録が必要となる。
SSH_PKEY は秘密鍵(id_rsa)のパスを指定する。
今回の案件では鍵の有効期間が1分なので、鍵の登録~実行がわちゃわちゃする。
(ここを自動化しておけばよかった)
おまけ: ケースごとの切り替え方法
ローカルか実環境か、などを実行時に意識しなくてよいよう、環境設定に連動するようにする。
最近すっかり聞かなくなったデザインパターンからFactory Methodをちょっとだけ流用。
抽象クラスを用意して、上記の3パターンでそれぞれ抽象クラスを継承したクラスを実装。
DBを操作したいレイヤー用に下記クラスを用意し、外側からは get_db() だけ呼べばあとはよろしくやってくれるようにしておく。
DB = None SessionLocal = None Server = None # DB種別取得 TYPE = 環境設定.db_type # 種別ごとにインスタンスを切り替える if TYPE == ケース1(ローカル実行): DB = DbAws() elif TYPE == ケース2(AWS実行): DB = DbBasic() elif TYPE == ケース3(踏み台アクセス): DB = DbSshTunnel() else: raise Exception Base, Server = DB.get_connection() SessionLocal = DB.get_session() def get_db(): try: yield SessionLocal finally: SessionLocal.close() if Server is not None: Server.close()
注意点としては、sshtunnelは2023/12時点では python 3.11 に対応していないので、コメントアウトしておく必要がある。。。
最後に
用途や接続先によってやり方が微妙に異なるのはよくある話。
こういうのを全て網羅したpip のモジュールがあってもよさそうだけど、ない。
RDSにアクセスする以上ついて回る話なので、今後に活かしていく。