DX開発事業部の西田です。
Cloud DLP(Data Loss Prevention)は個人情報やカード番号などの機密データを検出や匿名加工できるGoogle Cloudのサービスです。
Cloud Storageにアップロードされた機密情報をCloud DLPで検出、匿名化して安全に扱える状態にするというユースケースを実現するために検証を行ったのでまとめます。
Cloud Storageにアップロードされたデータの分類の自動化
Cloud DLPによる機密情報データの隔離のユースケースで公式のチュートリアルがすでにあったため、まずはこちらを試してみました。
作るものの構成は以下のようになります。
APIを有効化
以下のAPIを有効化します。
- Cloud Functions API
- Cloud Build API
- Sensitive Data Protection (DLP)
サービス アカウントへの権限の付与
機密データの保護のサービス アカウントに権限を付与する
Cloud Shellで、InspectContentを呼び出してCloud DLP サービス エージェントを作成します。
01 02 03 04 05 06 07 08 09 10 | PROJECT_ID=`gcloud config get project` curl --request POST \ "https://dlp.googleapis.com/v2/projects/$PROJECT_ID/locations/asia-northeast1/content:inspect" \ --header "X-Goog-User-Project: $PROJECT_ID" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '{"item":{"value":"google@google.com"}}' \ --compressed |
サービスエージェントのサービスアカウントに権限を付与します。
隔離と分類のパイプラインを作成する
Cloud Storage バケットを作成する
以下の3つのバケットを作成します。
- 検疫バケット
- 機密データを隔離したバケット
- 機密データが存在しないバケット
Pub/Sub トピックとサブスクリプションを作成する
DLPジョブから通知を受け取ってCloud FunctionsがサブスクライブするためのPub/Subトピックを作成します。
トピックを作成
Cloud Functions を作成する
2つのCloud Functions関数を作成します。
ランタイムはPython 3.12としています。
オブジェクトが Cloud Storage にアップロードされたときに呼び出される関数
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | from google.cloud import dlp from google.cloud import logging import os # ---------------------------- # User-configurable Constants PROJECT_ID = os.getenv( 'DLP_PROJECT_ID' , '<my-project>' ) """The bucket the to-be-scanned files are uploaded to.""" STAGING_BUCKET = os.getenv( 'QUARANTINE_BUCKET' , '<quarantine-bucket>' ) """ Pub/Sub topic to notify once the DLP job completes.""" PUB_SUB_TOPIC = os.getenv( 'PUB_SUB_TOPIC' , 'dlp-test-pubsub' ) """The minimum_likelihood (Enum) required before returning a match""" """For more info visit: https://cloud.google.com/dlp/docs/likelihood""" MIN_LIKELIHOOD = os.getenv( 'MIN_LIKELIHOOD' , 'POSSIBLE' ) """The maximum number of findings to report (0 = server maximum)""" MAX_FINDINGS = 0 """The infoTypes of information to match. ALL_BASIC for common infoTypes""" """For more info visit: https://cloud.google.com/dlp/docs/concepts-infotypes""" INFO_TYPES = os.getenv( 'INFO_TYPES' , 'FIRST_NAME,PHONE_NUMBER,EMAIL_ADDRESS,US_SOCIAL_SECURITY_NUMBER' ).split( ',' ) APP_LOG_NAME = os.getenv( 'LOG_NAME' , 'DLP-classify-gcs-files-create-dlp-job' ) # End of User-configurable Constants # ---------------------------------- # Initialize the Google Cloud client libraries dlp = dlp.DlpServiceClient() LOG_SEVERITY_DEFAULT = 'DEFAULT' LOG_SEVERITY_INFO = 'INFO' LOG_SEVERITY_ERROR = 'ERROR' LOG_SEVERITY_WARNING = 'WARNING' LOG_SEVERITY_DEBUG = 'DEBUG' def log(text, severity = LOG_SEVERITY_DEFAULT, log_name = APP_LOG_NAME): logging_client = logging.Client() logger = logging_client.logger(log_name) return logger.log_text(text, severity = severity) def create_DLP_job(data, done): """This function is triggered by new files uploaded to the designated Cloud Storage quarantine/staging bucket. It creates a dlp job for the uploaded file. Arg: data: The Cloud Storage Event Returns: None. Debug information is printed to the log. """ # Get the targeted file in the quarantine bucket file_name = data[ 'name' ] log( 'Function triggered for file [{}] to start a DLP job of InfoTypes [{}]' . format (file_name, ',' .join(INFO_TYPES)), severity = LOG_SEVERITY_INFO) # Prepare info_types by converting the list of strings (INFO_TYPES) into a list of dictionaries info_types = [{ 'name' : info_type} for info_type in INFO_TYPES] # Convert the project id into a full resource id. parent = f "projects/{PROJECT_ID}" # Construct the configuration dictionary. inspect_job = { 'inspect_config' : { 'info_types' : info_types, 'min_likelihood' : MIN_LIKELIHOOD, 'limits' : { 'max_findings_per_request' : MAX_FINDINGS }, }, 'storage_config' : { 'cloud_storage_options' : { 'file_set' : { 'url' : 'gs://{bucket_name}/{file_name}' . format ( bucket_name = STAGING_BUCKET, file_name = file_name) } } }, 'actions' : [{ 'pub_sub' : { 'topic' : 'projects/{project_id}/topics/{topic_id}' . format ( project_id = PROJECT_ID, topic_id = PUB_SUB_TOPIC) } }] } # Create the DLP job and let the DLP api processes it. try : dlp.create_dlp_job(parent = (parent), inspect_job = (inspect_job)) log( 'Job created by create_DLP_job' , severity = LOG_SEVERITY_INFO) except Exception as e: log(e, severity = LOG_SEVERITY_ERROR) |
1 2 3 4 | # Function dependencies, for example: # package>=version google-cloud-dlp google-cloud-logging |
この関数では、ファイル名を受け取り、「FIRST_NAME,PHONE_NUMBER,EMAIL_ADDRESS,US_SOCIAL_SECURITY_NUMBER」のinfoType検出器を使いDLPジョブを起動。
検出結果をトピックにパブリッシュします。
Pub/Sub キューでメッセージを受信したときに呼び出される関数
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | from google.cloud import dlp from google.cloud import storage from google.cloud import logging import os # ---------------------------- # User-configurable Constants PROJECT_ID = os.getenv( 'DLP_PROJECT_ID' , '<my-project>' ) """The bucket the to-be-scanned files are uploaded to.""" STAGING_BUCKET = os.getenv( 'QUARANTINE_BUCKET' , '<quarantine-bucket>' ) """The bucket to move "sensitive" files to.""" SENSITIVE_BUCKET = os.getenv( 'SENSITIVE_DATA_BUCKET' , '<sensitive-data-bucket>' ) """The bucket to move "non sensitive" files to.""" NONSENSITIVE_BUCKET = os.getenv( 'INSENSITIVE_DATA_BUCKET' , '<non-sensitive-data-bucket>' ) APP_LOG_NAME = os.getenv( 'LOG_NAME' , 'DLP-classify-gcs-files-resolve-dlp' ) # End of User-configurable Constants # ---------------------------------- # Initialize the Google Cloud client libraries dlp = dlp.DlpServiceClient() storage_client = storage.Client() LOG_SEVERITY_DEFAULT = 'DEFAULT' LOG_SEVERITY_INFO = 'INFO' LOG_SEVERITY_ERROR = 'ERROR' LOG_SEVERITY_WARNING = 'WARNING' LOG_SEVERITY_DEBUG = 'DEBUG' def log(text, severity = LOG_SEVERITY_DEFAULT, log_name = APP_LOG_NAME): logging_client = logging.Client() logger = logging_client.logger(log_name) return logger.log_text(text, severity = severity) def resolve_DLP(data, context): """This function listens to the pub/sub notification from function above. As soon as it gets pub/sub notification, it picks up results from the DLP job and moves the file to sensitive bucket or nonsensitive bucket accordingly. Args: data: The Cloud Pub/Sub event Returns: None. Debug information is printed to the log. """ # Get the targeted DLP job name that is created by the create_DLP_job function job_name = data[ 'attributes' ][ 'DlpJobName' ] log( 'Received pub/sub notification from DLP job: {}' . format (job_name), severity = LOG_SEVERITY_INFO) # Get the DLP job details by the job_name job = dlp.get_dlp_job(request = { 'name' : job_name}) log( 'Job Name:{name}\nStatus:{status}' . format (name = job.name, status = job.state), severity = LOG_SEVERITY_INFO) # Fetching Filename in Cloud Storage from the original dlpJob config. # See defintion of "JSON Output' in Limiting Cloud Storage Scans': # https://cloud.google.com/dlp/docs/inspecting-storage file_path = ( job.inspect_details.requested_options.job_config.storage_config .cloud_storage_options.file_set.url) file_name = file_path.split( "/" , 3 )[ 3 ] info_type_stats = job.inspect_details.result.info_type_stats source_bucket = storage_client.get_bucket(STAGING_BUCKET) source_blob = source_bucket.blob(file_name) if ( len (info_type_stats) > 0 ): # Found at least one sensitive data for stat in info_type_stats: log( 'Found {stat_cnt} instances of {stat_type_name}.' . format ( stat_cnt = stat.count, stat_type_name = stat.info_type.name), severity = LOG_SEVERITY_WARNING) log( 'Moving item to sensitive bucket' , severity = LOG_SEVERITY_DEBUG) destination_bucket = storage_client.get_bucket(SENSITIVE_BUCKET) source_bucket.copy_blob(source_blob, destination_bucket, file_name) # copy the item to the sensitive bucket source_blob.delete() # delete item from the quarantine bucket else : # No sensitive data found log( 'Moving item to non-sensitive bucket' , severity = LOG_SEVERITY_DEBUG) destination_bucket = storage_client.get_bucket(NONSENSITIVE_BUCKET) source_bucket.copy_blob( source_blob, destination_bucket, file_name) # copy the item to the non-sensitive bucket source_blob.delete() # delete item from the quarantine bucket log( 'classifying file [{}] Finished' . format (file_name), severity = LOG_SEVERITY_DEBUG) |
1 2 3 4 5 | # Function dependencies, for example: # package>=version google-cloud-dlp google-cloud-storage google-cloud-logging |
この関数ではトピックからサブスクライブしたDLPジョブの結果を元にバケットにファイルを振り分けて移動しています。
隔離バケットにサンプル ファイルをアップロードする
サンプルデータはこちら。
こちらを検疫バケットにアップロードするとファイルは数分後削除されます。
機密情報が検知されたファイルは機密データを隔離したバケット。
機密情報が検知されなかったファイルは機密データが存在しないバケットに保存されています。
DLPジョブの実行結果はこちらから確認でき何が検出されたかを見ることができます。
Cloud Storageにアップロードされたデータの匿名化
公式のチュートリアルでは検知までで、匿名化ではなかっため手を加えて匿名化するようにしてみました。
Cloud Storage バケットを作成する
2つのバケットを作成します。
- 検疫バケット
- 匿名化後バケット
オブジェクトが Cloud Storage にアップロードされたときに呼び出される関数
こちらをサンプルに匿名化の処理を記述しました。
DLPジョブを利用しなくてよいので関数は1つにしています。
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | from google.cloud import dlp_v2 from google.cloud import storage from google.cloud import logging import os # ---------------------------- # User-configurable Constants PROJECT_ID = os.getenv( 'DLP_PROJECT_ID' , '<my-project>' ) """The bucket the to-be-scanned files are uploaded to.""" STAGING_BUCKET = os.getenv( 'QUARANTINE_BUCKET' , '<quarantine-bucket>' ) """The bucket to move "deidentify" files to.""" DEIDENTIFY_BUCKET = os.getenv( 'DEIDENTIFY_DATA_BUCKET' , '<deidentify-bucket>' ) """The infoTypes of information to match. ALL_BASIC for common infoTypes""" """For more info visit: https://cloud.google.com/dlp/docs/concepts-infotypes""" INFO_TYPES = [ 'FIRST_NAME' , 'AGE' , 'EMAIL_ADDRESS' , 'CREDIT_CARD_NUMBER' , ] APP_LOG_NAME = os.getenv( 'LOG_NAME' , 'DLP-classify-gcs-files-deidentify-dlp' ) ALLOWED_EXTENSIONS = [ '.txt' , '.csv' ] # End of User-configurable Constants # ---------------------------------- # Initialize the Google Cloud client libraries dlp = dlp_v2.DlpServiceClient() storage_client = storage.Client() LOG_SEVERITY_DEFAULT = 'DEFAULT' LOG_SEVERITY_INFO = 'INFO' LOG_SEVERITY_ERROR = 'ERROR' LOG_SEVERITY_WARNING = 'WARNING' LOG_SEVERITY_DEBUG = 'DEBUG' def log(text, severity = LOG_SEVERITY_DEFAULT, log_name = APP_LOG_NAME): logging_client = logging.Client() logger = logging_client.logger(log_name) return logger.log_text(text, severity = severity) def deidentify_DLP(data, done, masking_character = "*" ): """This function is triggered by new files uploaded to the designated Cloud Storage quarantine/staging bucket. It creates a dlp job for the uploaded file. Arg: data: The Cloud Storage Event Returns: None. Debug information is printed to the log. """ # Get the targeted file in the quarantine bucket source_file_name = data[ 'name' ] log( 'Function triggered for file [{}] to start a DLP of InfoTypes [{}]' . format (source_file_name, ',' .join(INFO_TYPES)), severity = LOG_SEVERITY_INFO) if not any (source_file_name.endswith(ext) for ext in ALLOWED_EXTENSIONS): print (f "Ignored file: {source_file_name}" ) return # Download file source_bucket = storage_client.bucket(STAGING_BUCKET) source_blob = source_bucket.blob(source_file_name) temp_file_path = f "/tmp/{source_file_name.replace('/', '_')}" source_blob.download_to_filename(temp_file_path) with open (temp_file_path, 'r' , encoding = 'utf-8' ) as file : file_content = file .read() # Prepare info_types by converting the list of strings (INFO_TYPES) into a list of dictionaries info_types = [{ 'name' : info_type} for info_type in INFO_TYPES] # Convert the project id into a full resource id. parent = f "projects/{PROJECT_ID}" # Execute deidentify try : response = dlp.deidentify_content( request = { 'parent' : parent, 'inspect_config' : { 'info_types' : info_types, 'include_quote' : True }, 'deidentify_config' : { 'info_type_transformations' : { 'transformations' : [ { 'primitive_transformation' : { 'character_mask_config' : { 'masking_character' : masking_character, } } } ] } }, 'item' : { 'value' : file_content } } ) modified_content = response.item.value # Save deidentify contents with open (temp_file_path, 'w' , encoding = 'utf-8' ) as file : file .write(modified_content) destination_bucket = storage_client.bucket(DEIDENTIFY_BUCKET) destination_blob = destination_bucket.blob(source_file_name) destination_blob.upload_from_filename(temp_file_path) source_blob.delete() # delete item from the quarantine bucket except Exception as e: log(e, severity = LOG_SEVERITY_ERROR) |
1 2 3 4 5 | # Function dependencies, for example: # package>=version google-cloud-dlp google-cloud-storage google-cloud-logging |
これで検疫バケットに下記のようなテキストファイルをアップロードしてみます。
1 2 3 | 田中太郎 メールアドレス:foobar@example.com 電話番号:4242424242424242 |
検疫バケットからは削除され匿名化後バケットにこのように匿名化されたファイルが保存されることが確認できました。
1 2 3 | 田中** メールアドレス:****************** 電話番号:**************** |
データの安全性を確保してから分析や機械学習用途で利用する際に有用なユースケースとなるのではないでしょうか。
以下の記事も匿名化に関連していますのでご興味があればぜひ確認ください。