DX開発事業部の西田です。

Cloud Data Loss Prevention | Google Cloud
Cloud DLP を使用すると、特に機密性の高いデータ要素を自動的に検出、分類、保護できます。

Cloud DLP(Data Loss Prevention)は個人情報やカード番号などの機密データを検出や匿名加工できるGoogle Cloudのサービスです。

Cloud Storageにアップロードされた機密情報をCloud DLPで検出、匿名化して安全に扱える状態にするというユースケースを実現するために検証を行ったのでまとめます。

Cloud Storageにアップロードされたデータの分類の自動化

Cloud DLPによる機密情報データの隔離のユースケースで公式のチュートリアルがすでにあったため、まずはこちらを試してみました。

作るものの構成は以下のようになります。

APIを有効化

以下のAPIを有効化します。

  • Cloud Functions API
  • Cloud Build API
  • Sensitive Data Protection (DLP)

サービス アカウントへの権限の付与

機密データの保護のサービス アカウントに権限を付与する

Cloud Shellで、InspectContentを呼び出してCloud DLP サービス エージェントを作成します。

01
02
03
04
05
06
07
08
09
10
PROJECT_ID=`gcloud config get project`
 
curl --request POST \
 "https://dlp.googleapis.com/v2/projects/$PROJECT_ID/locations/asia-northeast1/content:inspect" \
 --header "X-Goog-User-Project: $PROJECT_ID" \
 --header "Authorization: Bearer $(gcloud auth print-access-token)" \
 --header 'Accept: application/json' \
 --header 'Content-Type: application/json' \
 --data '{"item":{"value":"google@google.com"}}' \
 --compressed

サービスエージェントのサービスアカウントに権限を付与します。

隔離と分類のパイプラインを作成する

Cloud Storage バケットを作成する

以下の3つのバケットを作成します。

  • 検疫バケット
  • 機密データを隔離したバケット
  • 機密データが存在しないバケット

Pub/Sub トピックとサブスクリプションを作成する

DLPジョブから通知を受け取ってCloud FunctionsがサブスクライブするためのPub/Subトピックを作成します。

トピックを作成

Cloud Functions を作成する

2つのCloud Functions関数を作成します。
ランタイムはPython 3.12としています。

オブジェクトが Cloud Storage にアップロードされたときに呼び出される関数

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from google.cloud import dlp
from google.cloud import logging
import os
 
# ----------------------------
#  User-configurable Constants
 
PROJECT_ID = os.getenv('DLP_PROJECT_ID', '<my-project>')
"""The bucket the to-be-scanned files are uploaded to."""
STAGING_BUCKET = os.getenv('QUARANTINE_BUCKET', '<quarantine-bucket>')
""" Pub/Sub topic to notify once the  DLP job completes."""
PUB_SUB_TOPIC = os.getenv('PUB_SUB_TOPIC', 'dlp-test-pubsub')
"""The minimum_likelihood (Enum) required before returning a match"""
"""For more info visit: https://cloud.google.com/dlp/docs/likelihood"""
MIN_LIKELIHOOD = os.getenv('MIN_LIKELIHOOD', 'POSSIBLE')
"""The maximum number of findings to report (0 = server maximum)"""
MAX_FINDINGS = 0
"""The infoTypes of information to match. ALL_BASIC for common infoTypes"""
"""For more info visit: https://cloud.google.com/dlp/docs/concepts-infotypes"""
INFO_TYPES = os.getenv('INFO_TYPES', 'FIRST_NAME,PHONE_NUMBER,EMAIL_ADDRESS,US_SOCIAL_SECURITY_NUMBER').split(',')
 
APP_LOG_NAME = os.getenv('LOG_NAME', 'DLP-classify-gcs-files-create-dlp-job')
 
# End of User-configurable Constants
# ----------------------------------
 
# Initialize the Google Cloud client libraries
dlp = dlp.DlpServiceClient()
 
LOG_SEVERITY_DEFAULT = 'DEFAULT'
LOG_SEVERITY_INFO = 'INFO'
LOG_SEVERITY_ERROR = 'ERROR'
LOG_SEVERITY_WARNING = 'WARNING'
LOG_SEVERITY_DEBUG = 'DEBUG'
 
 
def log(text, severity=LOG_SEVERITY_DEFAULT, log_name=APP_LOG_NAME):
    logging_client = logging.Client()
    logger = logging_client.logger(log_name)
 
    return logger.log_text(text, severity=severity)
 
 
def create_DLP_job(data, done):
    """This function is triggered by new files uploaded to the designated Cloud Storage quarantine/staging bucket.
 
         It creates a dlp job for the uploaded file.
      Arg:
         data: The Cloud Storage Event
      Returns:
          None. Debug information is printed to the log.
      """
    # Get the targeted file in the quarantine bucket
    file_name = data['name']
    log('Function triggered for file [{}] to start a DLP job of InfoTypes [{}]'.format(file_name, ','.join(INFO_TYPES)),
        severity=LOG_SEVERITY_INFO)
 
    # Prepare info_types by converting the list of strings (INFO_TYPES) into a list of dictionaries
    info_types = [{'name': info_type} for info_type in INFO_TYPES]
 
    # Convert the project id into a full resource id.
    parent = f"projects/{PROJECT_ID}"
 
    # Construct the configuration dictionary.
    inspect_job = {
        'inspect_config': {
            'info_types': info_types,
            'min_likelihood': MIN_LIKELIHOOD,
            'limits': {
                'max_findings_per_request': MAX_FINDINGS
            },
        },
        'storage_config': {
            'cloud_storage_options': {
                'file_set': {
                    'url':
                        'gs://{bucket_name}/{file_name}'.format(
                            bucket_name=STAGING_BUCKET, file_name=file_name)
                }
            }
        },
        'actions': [{
            'pub_sub': {
                'topic':
                    'projects/{project_id}/topics/{topic_id}'.format(
                        project_id=PROJECT_ID, topic_id=PUB_SUB_TOPIC)
            }
        }]
    }
 
    # Create the DLP job and let the DLP api processes it.
    try:
        dlp.create_dlp_job(parent=(parent), inspect_job=(inspect_job))
        log('Job created by create_DLP_job', severity=LOG_SEVERITY_INFO)
    except Exception as e:
        log(e, severity=LOG_SEVERITY_ERROR)
1
2
3
4
# Function dependencies, for example:
# package>=version
google-cloud-dlp
google-cloud-logging

この関数では、ファイル名を受け取り、「FIRST_NAME,PHONE_NUMBER,EMAIL_ADDRESS,US_SOCIAL_SECURITY_NUMBER」のinfoType検出器を使いDLPジョブを起動。
検出結果をトピックにパブリッシュします。

Pub/Sub キューでメッセージを受信したときに呼び出される関数

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from google.cloud import dlp
from google.cloud import storage
from google.cloud import logging
import os
 
# ----------------------------
#  User-configurable Constants
 
PROJECT_ID = os.getenv('DLP_PROJECT_ID', '<my-project>')
"""The bucket the to-be-scanned files are uploaded to."""
STAGING_BUCKET = os.getenv('QUARANTINE_BUCKET', '<quarantine-bucket>')
"""The bucket to move "sensitive" files to."""
SENSITIVE_BUCKET = os.getenv('SENSITIVE_DATA_BUCKET', '<sensitive-data-bucket>')
"""The bucket to move "non sensitive" files to."""
NONSENSITIVE_BUCKET = os.getenv('INSENSITIVE_DATA_BUCKET', '<non-sensitive-data-bucket>')
 
APP_LOG_NAME = os.getenv('LOG_NAME', 'DLP-classify-gcs-files-resolve-dlp')
 
# End of User-configurable Constants
# ----------------------------------
 
# Initialize the Google Cloud client libraries
dlp = dlp.DlpServiceClient()
storage_client = storage.Client()
 
LOG_SEVERITY_DEFAULT = 'DEFAULT'
LOG_SEVERITY_INFO = 'INFO'
LOG_SEVERITY_ERROR = 'ERROR'
LOG_SEVERITY_WARNING = 'WARNING'
LOG_SEVERITY_DEBUG = 'DEBUG'
 
 
def log(text, severity=LOG_SEVERITY_DEFAULT, log_name=APP_LOG_NAME):
    logging_client = logging.Client()
    logger = logging_client.logger(log_name)
 
    return logger.log_text(text, severity=severity)
 
 
def resolve_DLP(data, context):
    """This function listens to the pub/sub notification from function above.
 
      As soon as it gets pub/sub notification, it picks up results from the
      DLP job and moves the file to sensitive bucket or nonsensitive bucket
      accordingly.
      Args:
          data: The Cloud Pub/Sub event
 
      Returns:
          None. Debug information is printed to the log.
      """
    # Get the targeted DLP job name that is created by the create_DLP_job function
    job_name = data['attributes']['DlpJobName']
    log('Received pub/sub notification from DLP job: {}'.format(job_name), severity=LOG_SEVERITY_INFO)
 
    # Get the DLP job details by the job_name
    job = dlp.get_dlp_job(request={'name': job_name})
    log('Job Name:{name}\nStatus:{status}'.format(name=job.name, status=job.state), severity=LOG_SEVERITY_INFO)
 
    # Fetching Filename in Cloud Storage from the original dlpJob config.
    # See defintion of "JSON Output' in Limiting Cloud Storage Scans':
    # https://cloud.google.com/dlp/docs/inspecting-storage
 
    file_path = (
        job.inspect_details.requested_options.job_config.storage_config
            .cloud_storage_options.file_set.url)
    file_name = file_path.split("/", 3)[3]
 
    info_type_stats = job.inspect_details.result.info_type_stats
    source_bucket = storage_client.get_bucket(STAGING_BUCKET)
    source_blob = source_bucket.blob(file_name)
    if (len(info_type_stats) > 0):
        # Found at least one sensitive data
        for stat in info_type_stats:
            log('Found {stat_cnt} instances of {stat_type_name}.'.format(
                stat_cnt=stat.count, stat_type_name=stat.info_type.name), severity=LOG_SEVERITY_WARNING)
        log('Moving item to sensitive bucket', severity=LOG_SEVERITY_DEBUG)
        destination_bucket = storage_client.get_bucket(SENSITIVE_BUCKET)
        source_bucket.copy_blob(source_blob, destination_bucket,
                                file_name)  # copy the item to the sensitive bucket
        source_blob.delete()  # delete item from the quarantine bucket
 
    else:
        # No sensitive data found
        log('Moving item to non-sensitive bucket', severity=LOG_SEVERITY_DEBUG)
        destination_bucket = storage_client.get_bucket(NONSENSITIVE_BUCKET)
        source_bucket.copy_blob(
            source_blob, destination_bucket,
            file_name)  # copy the item to the non-sensitive bucket
        source_blob.delete()  # delete item from the quarantine bucket
    log('classifying file [{}] Finished'.format(file_name), severity=LOG_SEVERITY_DEBUG)
1
2
3
4
5
# Function dependencies, for example:
# package>=version
google-cloud-dlp
google-cloud-storage
google-cloud-logging

この関数ではトピックからサブスクライブしたDLPジョブの結果を元にバケットにファイルを振り分けて移動しています。

隔離バケットにサンプル ファイルをアップロードする

サンプルデータはこちら

こちらを検疫バケットにアップロードするとファイルは数分後削除されます。
機密情報が検知されたファイルは機密データを隔離したバケット。
機密情報が検知されなかったファイルは機密データが存在しないバケットに保存されています。

DLPジョブの実行結果はこちらから確認でき何が検出されたかを見ることができます。

Cloud Storageにアップロードされたデータの匿名化

公式のチュートリアルでは検知までで、匿名化ではなかっため手を加えて匿名化するようにしてみました。

Cloud Storage バケットを作成する

2つのバケットを作成します。

  • 検疫バケット
  • 匿名化後バケット

オブジェクトが Cloud Storage にアップロードされたときに呼び出される関数

こちらをサンプルに匿名化の処理を記述しました。
DLPジョブを利用しなくてよいので関数は1つにしています。

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from google.cloud import dlp_v2
from google.cloud import storage
from google.cloud import logging
import os
 
# ----------------------------
#  User-configurable Constants
 
PROJECT_ID = os.getenv('DLP_PROJECT_ID', '<my-project>')
"""The bucket the to-be-scanned files are uploaded to."""
STAGING_BUCKET = os.getenv('QUARANTINE_BUCKET', '<quarantine-bucket>')
"""The bucket to move "deidentify" files to."""
DEIDENTIFY_BUCKET = os.getenv('DEIDENTIFY_DATA_BUCKET', '<deidentify-bucket>')
 
"""The infoTypes of information to match. ALL_BASIC for common infoTypes"""
"""For more info visit: https://cloud.google.com/dlp/docs/concepts-infotypes"""
INFO_TYPES = [
    'FIRST_NAME',
    'AGE',
    'EMAIL_ADDRESS',
    'CREDIT_CARD_NUMBER',
]
 
APP_LOG_NAME = os.getenv('LOG_NAME', 'DLP-classify-gcs-files-deidentify-dlp')
 
ALLOWED_EXTENSIONS = ['.txt', '.csv']
 
# End of User-configurable Constants
# ----------------------------------
 
# Initialize the Google Cloud client libraries
dlp = dlp_v2.DlpServiceClient()
storage_client = storage.Client()
 
LOG_SEVERITY_DEFAULT = 'DEFAULT'
LOG_SEVERITY_INFO = 'INFO'
LOG_SEVERITY_ERROR = 'ERROR'
LOG_SEVERITY_WARNING = 'WARNING'
LOG_SEVERITY_DEBUG = 'DEBUG'
 
 
def log(text, severity=LOG_SEVERITY_DEFAULT, log_name=APP_LOG_NAME):
    logging_client = logging.Client()
    logger = logging_client.logger(log_name)
 
    return logger.log_text(text, severity=severity)
 
 
def deidentify_DLP(data, done, masking_character="*"):
    """This function is triggered by new files uploaded to the designated Cloud Storage quarantine/staging bucket.
 
         It creates a dlp job for the uploaded file.
      Arg:
         data: The Cloud Storage Event
      Returns:
          None. Debug information is printed to the log.
      """
    # Get the targeted file in the quarantine bucket
    source_file_name = data['name']
    log('Function triggered for file [{}] to start a DLP of InfoTypes [{}]'.format(source_file_name, ','.join(INFO_TYPES)),
        severity=LOG_SEVERITY_INFO)
 
    if not any(source_file_name.endswith(ext) for ext in ALLOWED_EXTENSIONS):
        print(f"Ignored file: {source_file_name}")
        return
 
    # Download file
    source_bucket = storage_client.bucket(STAGING_BUCKET)
    source_blob = source_bucket.blob(source_file_name)
 
    temp_file_path = f"/tmp/{source_file_name.replace('/', '_')}"
    source_blob.download_to_filename(temp_file_path)
 
    with open(temp_file_path, 'r', encoding='utf-8') as file:
        file_content = file.read()
 
    # Prepare info_types by converting the list of strings (INFO_TYPES) into a list of dictionaries
    info_types = [{'name': info_type} for info_type in INFO_TYPES]
 
    # Convert the project id into a full resource id.
    parent = f"projects/{PROJECT_ID}"
 
    # Execute deidentify
    try:
      response = dlp.deidentify_content(
          request={
              'parent': parent,
              'inspect_config': {
                  'info_types': info_types,
                  'include_quote': True
              },
              'deidentify_config': {
                  'info_type_transformations': {
                      'transformations': [
                          {
                              'primitive_transformation': {
                                  'character_mask_config': {
                                      'masking_character': masking_character,
                                  }
                              }
                          }
                      ]
                  }
              },
              'item': {
                'value': file_content
              }
          }
      )
 
      modified_content = response.item.value
 
      # Save deidentify contents
      with open(temp_file_path, 'w', encoding='utf-8') as file:
          file.write(modified_content)
 
      destination_bucket = storage_client.bucket(DEIDENTIFY_BUCKET)
      destination_blob = destination_bucket.blob(source_file_name)
 
      destination_blob.upload_from_filename(temp_file_path)
 
      source_blob.delete()  # delete item from the quarantine bucket
 
    except Exception as e:
        log(e, severity=LOG_SEVERITY_ERROR)
1
2
3
4
5
# Function dependencies, for example:
# package>=version
google-cloud-dlp
google-cloud-storage
google-cloud-logging

これで検疫バケットに下記のようなテキストファイルをアップロードしてみます。

1
2
3
田中太郎
メールアドレス:foobar@example.com
電話番号:4242424242424242

検疫バケットからは削除され匿名化後バケットにこのように匿名化されたファイルが保存されることが確認できました。

1
2
3
田中**
メールアドレス:******************
電話番号:****************

データの安全性を確保してから分析や機械学習用途で利用する際に有用なユースケースとなるのではないでしょうか。

以下の記事も匿名化に関連していますのでご興味があればぜひ確認ください。