はじめに
昨今勢いを増すDatabricksで、行レベルセキュリティ(Row Level Security, RLS)を行う場合に
どういった設定を行えばいいのかを整理するために作業イメージが湧くコードをまとめました。
以下の手順で紹介していきます。
- DB設定
- セキュリティ設定 – UnityCatalogでRLSを実現
- SQL Warehouseの設定
前提
- よくある販売店などテナント別で情報を出し分ける(マルチテナント)ダッシュボードを実現したい
- RLSで制御し担当者の属性で表示情報をフィルタリングしたい
- Snowflake Connectorで外部DBと接続
- 前回取得時点との差分を使って差分データを更新したい。
- Snowflake から取得したデータを Delta Lake に取り込む
- 例として、special_idとsub_idがユーザーと紐づく
- special_idとsub_idを用いて行レベルのセキュリティ設定を行う。
- データレイクにはS3を使用。(Delta Lakeとして活用)
この記事でやらないこと
ダッシュボード自体のビジュアル構築
DB接続設定
まずDB接続設定です。以下のようにpythonでオプション設定を行い、コネクターの実装を行います。
sfOptions = {
"sfURL": "xxxxxx.ap-northeast-1.aws.snowflakecomputing.com",
"sfUser": dbutils.secrets.get("snowflake", "user"),
"sfPassword": dbutils.secrets.get("snowflake", "password"),
"sfDatabase": "MY_DB",
"sfSchema": "PUBLIC",
"sfWarehouse": "MY_WH",
"sfRole": "MY_ROLE",
}
※dbutils.secretsはDatabricks Secret Scopeに保存されたものを使うので
あらかじめ設定しておく必要があります。こちら後述します
差分取り込み処理
取り込み先(済み)のDelta Lake の最新タイムスタンプを用いて、更新分のレコードを検知し差分取り込みを行います。
from pyspark.sql import functions as F
target_path = "s3://my-bucket/delta/bronze/source_table"
# 前回のwatermark取得(無ければ初回)
try:
last_ts = spark.read.format("delta").load(target_path) \
.agg(F.max("updated_at").alias("mx")).collect()[0]["mx"]
except:
last_ts = None
query = "select * from SOURCE_TABLE"
if last_ts is not None:
query += f" where updated_at > '{last_ts}'"
src_df = (spark.read.format("snowflake")
.options(**sfOptions)
.option("query", query)
.load()
)
# upsertが必要ならMERGE(次を参照)
(src_df.write.format("delta")
.mode("append")
.save(target_path)
)
Upsert(同じPKの更新も取り込みたい):Delta MERGEの実装イメージ
t.{pk} = s.{pk}の部分は記載の通りaliasで、
ターゲットのプライマリキーとソースのプライマリキーで比較を行っています。
一致行は全カラム更新、不一致行は全カラム挿入を行います。
from delta.tables import DeltaTable
target_path = "s3://my-bucket/delta/silver/source_table"
pk = "id"
# 初回は作る
if not DeltaTable.isDeltaTable(spark, target_path):
(src_df.write.format("delta").mode("overwrite").save(target_path))
t = DeltaTable.forPath(spark, target_path)
(t.alias("t")
.merge(src_df.alias("s"), f"t.{pk} = s.{pk}")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
dbutils.secretsへの設定方法
① CLI 認証
databricks configure
② Secret Scope 作成
databricks secrets create-scope --scope snowflake
③ Secret 登録
databricks secrets put \ --scope snowflake \ --key password
実行すると プロンプトで値を入力(表示されない)されます。
Enter secret value: ********
これで以下が使えるようになります。
dbutils.secrets.get("snowflake", "password")
マルチテナント実装関連
テーブルの作成とRLS(Row filter)の設定と紐付けが必要なので、その作業を行います。
属性は以下で、テーブルにRow filter functionをALTER TABLEでつける形です。
- テーブル:main.sec.user_store_acl
- Row filter function:main.sec.rls_special_store
テーブル作成
RLSを実現するための、人と権限の紐付け設定です。
CREATE TABLE IF NOT EXISTS main.sec.user_store_acl (
user_email STRING,
special_id STRING,
sub_id STRING
);
-- 例: 登録
INSERT INTO main.sec.user_store_acl VALUES
('a@example.com','C001','S010'),
('a@example.com','C001','S011'),
('b@example.com','C002','S020');
2) Row filter function(special_id + sub_id の2キーで判定)
上で作った権限設定でRLSの設定を行います。
CREATE OR REPLACE FUNCTION main.sec.rls_special_store(special_id STRING, sub_id STRING)
RETURN
EXISTS (
SELECT 1
FROM main.sec.user_store_acl a
WHERE a.user_email = current_user()
AND a.special_id = special_id
AND a.sub_id = sub_id
);
3) 対象テーブルに適用(UIの “Add filter” 相当)
テーブルへのRow filter functionの紐付け
ALTER TABLE main.sales.fact_sales SET ROW FILTER main.sec.rls_special_store ON (special_id, sub_id);
※SET ROW FILTER main.sec.rls_special_store ON (special_id, sub_id)を毎回したくない場合、タグの運用を行うことで簡略化し、事故を防ぐことが可能
SQL Warehouse関連
次に、Delta Lake 上のテーブルを SQL で操作するための SQL Warehouseの設定です。
1-1. 一覧取得
warehouse_idを知るための処理
curl -sS -X GET "https://${DATABRICKS_HOST}/api/2.0/sql/warehouses" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}"
1-2. 作成
SQL Warehouseの作成
curl -sS -X POST "https://${DATABRICKS_HOST}/api/2.0/sql/warehouses" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"name": "analytics-wh",
"cluster_size": "2X-Small",
"auto_stop_mins": 10,
"max_num_clusters": 1,
"enable_serverless_compute": true
}'
1-3. 設定変更(edit)
SQL Warehouseの設定変更例です。
curl -sS -X POST "https://${DATABRICKS_HOST}/api/2.0/sql/warehouses/${WAREHOUSE_ID}/edit" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"name": "analytics-wh",
"auto_stop_mins": 20
}'
2) Warehouse上でSQLを実行する(Statement Execution API 2.0)
SQL Warehouseの実行例です。
curl -sS -X POST "https://${DATABRICKS_HOST}/api/2.0/sql/statements/" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"warehouse_id": "'"${WAREHOUSE_ID}"'",
"catalog": "main",
"schema": "sales",
"statement": "SELECT count(*) AS cnt FROM fact_sales"
}'
3) PythonでSQL実行(Databricks SQL Connector)
PythonでSQL実行する場合は以下のようなイメージになります。
from databricks import sql
import os
conn = sql.connect(
server_hostname=os.environ["DATABRICKS_HOSTNAME"], # 例: adb-xxxx.azuredatabricks.net
http_path=os.environ["DATABRICKS_HTTP_PATH"], # SQL WarehouseのHTTP Path
access_token=os.environ["DATABRICKS_TOKEN"],
)
with conn.cursor() as cur:
cur.execute("SELECT count(*) FROM main.sales.fact_sales")
print(cur.fetchall())
終わりに
以上となります。
手順としては以下を行いました。
- DB設定
- セキュリティ設定 – UnityCatalogでRLSを実現
- SQL Warehouseの設定
このあとは
- Databricks AI/BIのダッシュボードを作りユーザー提供
- Databricks appsなどを使って簡易にアプリを作りSQL Warehouseと紐付けて操作
など様々なアプローチがあると思います。
行レベルの権限管理を手厚くサポートしているDatabricksとAWSの組み合わせで
早急なビジネスダッシュボード構築ができそうですね。