はじめに

昨今勢いを増すDatabricksで、行レベルセキュリティ(Row Level Security, RLS)を行う場合に
どういった設定を行えばいいのかを整理するために作業イメージが湧くコードをまとめました。
以下の手順で紹介していきます。

  • DB設定
  • セキュリティ設定 – UnityCatalogでRLSを実現
  • SQL Warehouseの設定

前提

  • よくある販売店などテナント別で情報を出し分ける(マルチテナント)ダッシュボードを実現したい
  • RLSで制御し担当者の属性で表示情報をフィルタリングしたい
  • Snowflake Connectorで外部DBと接続
  • 前回取得時点との差分を使って差分データを更新したい。
  • Snowflake から取得したデータを Delta Lake に取り込む
  • 例として、special_idとsub_idがユーザーと紐づく
  • special_idとsub_idを用いて行レベルのセキュリティ設定を行う。
  • データレイクにはS3を使用。(Delta Lakeとして活用)

この記事でやらないこと

ダッシュボード自体のビジュアル構築

DB接続設定

まずDB接続設定です。以下のようにpythonでオプション設定を行い、コネクターの実装を行います。

sfOptions = {
  "sfURL": "xxxxxx.ap-northeast-1.aws.snowflakecomputing.com",
  "sfUser": dbutils.secrets.get("snowflake", "user"),
  "sfPassword": dbutils.secrets.get("snowflake", "password"),
  "sfDatabase": "MY_DB",
  "sfSchema": "PUBLIC",
  "sfWarehouse": "MY_WH",
  "sfRole": "MY_ROLE",
}

※dbutils.secretsはDatabricks Secret Scopeに保存されたものを使うので
あらかじめ設定しておく必要があります。こちら後述します

差分取り込み処理

取り込み先(済み)のDelta Lake の最新タイムスタンプを用いて、更新分のレコードを検知し差分取り込みを行います。

from pyspark.sql import functions as F

target_path = "s3://my-bucket/delta/bronze/source_table"

# 前回のwatermark取得(無ければ初回)
try:
    last_ts = spark.read.format("delta").load(target_path) \
        .agg(F.max("updated_at").alias("mx")).collect()[0]["mx"]
except:
    last_ts = None

query = "select * from SOURCE_TABLE"
if last_ts is not None:
    query += f" where updated_at > '{last_ts}'"

src_df = (spark.read.format("snowflake")
  .options(**sfOptions)
  .option("query", query)
  .load()
)

# upsertが必要ならMERGE(次を参照)
(src_df.write.format("delta")
  .mode("append")
  .save(target_path)
)

Upsert(同じPKの更新も取り込みたい):Delta MERGEの実装イメージ

t.{pk} = s.{pk}の部分は記載の通りaliasで、
ターゲットのプライマリキーとソースのプライマリキーで比較を行っています。
一致行は全カラム更新、不一致行は全カラム挿入を行います。

from delta.tables import DeltaTable

target_path = "s3://my-bucket/delta/silver/source_table"
pk = "id"

# 初回は作る
if not DeltaTable.isDeltaTable(spark, target_path):
    (src_df.write.format("delta").mode("overwrite").save(target_path))

t = DeltaTable.forPath(spark, target_path)

(t.alias("t")
 .merge(src_df.alias("s"), f"t.{pk} = s.{pk}")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute()
)

dbutils.secretsへの設定方法

① CLI 認証

databricks configure

② Secret Scope 作成

databricks secrets create-scope --scope snowflake

③ Secret 登録

databricks secrets put \
  --scope snowflake \
  --key password

実行すると プロンプトで値を入力(表示されない)されます。

Enter secret value: ********

これで以下が使えるようになります。

dbutils.secrets.get("snowflake", "password")

マルチテナント実装関連

テーブルの作成とRLS(Row filter)の設定と紐付けが必要なので、その作業を行います。

属性は以下で、テーブルにRow filter functionをALTER TABLEでつける形です。

  • テーブル:main.sec.user_store_acl
  • Row filter function:main.sec.rls_special_store

テーブル作成

RLSを実現するための、人と権限の紐付け設定です。

CREATE TABLE IF NOT EXISTS main.sec.user_store_acl (
  user_email STRING,
  special_id   STRING,
  sub_id   STRING
);

-- 例: 登録
INSERT INTO main.sec.user_store_acl VALUES
  ('a@example.com','C001','S010'),
  ('a@example.com','C001','S011'),
  ('b@example.com','C002','S020');

2) Row filter function(special_id + sub_id の2キーで判定)

上で作った権限設定でRLSの設定を行います。

CREATE OR REPLACE FUNCTION main.sec.rls_special_store(special_id STRING, sub_id STRING)
RETURN
  EXISTS (
    SELECT 1
    FROM main.sec.user_store_acl a
    WHERE a.user_email = current_user()
      AND a.special_id   = special_id
      AND a.sub_id   = sub_id
  );

3) 対象テーブルに適用(UIの “Add filter” 相当)

テーブルへのRow filter functionの紐付け

ALTER TABLE main.sales.fact_sales
SET ROW FILTER main.sec.rls_special_store ON (special_id, sub_id);

※SET ROW FILTER main.sec.rls_special_store ON (special_id, sub_id)を毎回したくない場合、タグの運用を行うことで簡略化し、事故を防ぐことが可能

SQL Warehouse関連

次に、Delta Lake 上のテーブルを SQL で操作するための SQL Warehouseの設定です。

1-1. 一覧取得

warehouse_idを知るための処理

curl -sS -X GET "https://${DATABRICKS_HOST}/api/2.0/sql/warehouses" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}"

1-2. 作成

SQL Warehouseの作成

curl -sS -X POST "https://${DATABRICKS_HOST}/api/2.0/sql/warehouses" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "analytics-wh",
    "cluster_size": "2X-Small",
    "auto_stop_mins": 10,
    "max_num_clusters": 1,
    "enable_serverless_compute": true
  }'

1-3. 設定変更(edit)

SQL Warehouseの設定変更例です。

curl -sS -X POST "https://${DATABRICKS_HOST}/api/2.0/sql/warehouses/${WAREHOUSE_ID}/edit" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "analytics-wh",
    "auto_stop_mins": 20
  }'

2) Warehouse上でSQLを実行する(Statement Execution API 2.0)

SQL Warehouseの実行例です。

curl -sS -X POST "https://${DATABRICKS_HOST}/api/2.0/sql/statements/" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
  -H "Content-Type: application/json" \
  -d '{
    "warehouse_id": "'"${WAREHOUSE_ID}"'",
    "catalog": "main",
    "schema": "sales",
    "statement": "SELECT count(*) AS cnt FROM fact_sales"
  }'

3) PythonでSQL実行(Databricks SQL Connector)

PythonでSQL実行する場合は以下のようなイメージになります。

from databricks import sql
import os

conn = sql.connect(
    server_hostname=os.environ["DATABRICKS_HOSTNAME"],  # 例: adb-xxxx.azuredatabricks.net
    http_path=os.environ["DATABRICKS_HTTP_PATH"],        # SQL WarehouseのHTTP Path
    access_token=os.environ["DATABRICKS_TOKEN"],
)

with conn.cursor() as cur:
    cur.execute("SELECT count(*) FROM main.sales.fact_sales")
    print(cur.fetchall())

終わりに

以上となります。
手順としては以下を行いました。

  • DB設定
  • セキュリティ設定 – UnityCatalogでRLSを実現
  • SQL Warehouseの設定

このあとは

  • Databricks AI/BIのダッシュボードを作りユーザー提供
  • Databricks appsなどを使って簡易にアプリを作りSQL Warehouseと紐付けて操作

など様々なアプローチがあると思います。

行レベルの権限管理を手厚くサポートしているDatabricksとAWSの組み合わせで
早急なビジネスダッシュボード構築ができそうですね。