記事の目的

DynamoDBをcloudshellからでも、ローカルからでも自由自在に扱うためのプログラム例集

対象者

DynamoDBをPythonから満遍なく操作したい人

テーブルアイテム削除プログラム

ファイル名

delete_all_table_items.py

実行例:

python3 delete_all_table_items.py Template-hogehoge

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import boto3
import argparse
 
# コマンドライン引数を解析
parser = argparse.ArgumentParser(description='DynamoDBテーブルの全アイテムを削除するスクリプト')
parser.add_argument('table_name', type=str, help='削除対象のDynamoDBテーブル名')
args = parser.parse_args()
 
# DynamoDBクライアントの初期化
dynamodb = boto3.resource('dynamodb')
 
# 削除対象のテーブル
table = dynamodb.Table(args.table_name)
 
# テーブル内の全てのアイテムを取得
response = table.scan()
items = response['Items']
 
# テーブルのスキーマ(パーティションキーとソートキーの情報)を取得
key_schema = table.key_schema
partition_key = key_schema[0]['AttributeName']
sort_key = key_schema[1]['AttributeName'] if len(key_schema) > 1 else None
 
# 取得したアイテムを削除
for item in items:
    # パーティションキーの値を取得
    key = {partition_key: item[partition_key]}
 
    # ソートキーがある場合はソートキーも追加
    if sort_key:
        key[sort_key] = item[sort_key]
 
    # アイテムを削除
    table.delete_item(Key=key)
    print(f"Deleted item with {partition_key}: {item[partition_key]}")
 
# ページネーション対応(アイテムが多い場合)
while 'LastEvaluatedKey' in response:
    response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    items = response['Items']
    for item in items:
        key = {partition_key: item[partition_key]}
        if sort_key:
            key[sort_key] = item[sort_key]
        table.delete_item(Key=key)
        print(f"Deleted item with {partition_key}: {item[partition_key]}")

テーブルのスキーマ取得

ファイル名

getTableSchema.sh

実行例

python3 getTableSchema.sh

1
2
3
4
for table in $(aws dynamodb list-tables --query "TableNames[]" --output text); do
  echo "Table: $table"
  aws dynamodb describe-table --table-name $table --query "Table.KeySchema" --output json
done

テーブルのアイテム数取得プログラム

実行例

python3 getDBItemLength.sh

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
import boto3
import argparse
 
# コマンドライン引数の設定
parser = argparse.ArgumentParser(description='Get item count from DynamoDB table.')
parser.add_argument('table_name', type=str, help='DynamoDB table name')
args = parser.parse_args()
 
# DynamoDBクライアントの初期化
dynamodb = boto3.client('dynamodb')
 
# テーブル名を引数から取得
table_name = args.table_name
 
# テーブル情報の取得
response = dynamodb.describe_table(TableName=table_name)
 
# アイテム数の取得
item_count = response['Table']['ItemCount']
 
print(f"The table {table_name} contains {item_count} items.")

アイテムをDynamoDBに追加するプログラム

ファイル名

insert_dynamodb_items.py

実行例

python3 insert_dynamodb_items.py User-hogehoge users.json

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import boto3
import json
import argparse
 
# コマンドライン引数の設定
parser = argparse.ArgumentParser(description='Insert JSON data into DynamoDB table.')
parser.add_argument('table_name', type=str, help='DynamoDB table name')
parser.add_argument('json_file', type=str, help='Path to JSON data file')
args = parser.parse_args()
 
# DynamoDBクライアントの初期化
dynamodb = boto3.client('dynamodb')
resource = boto3.resource('dynamodb')
 
# テーブル名を引数から取得
table_name = args.table_name
table = resource.Table(table_name)
 
# テーブルのスキーマ情報からパーティションキー名を取得
key_schema = table.key_schema
partition_key = next(k['AttributeName'] for k in key_schema if k['KeyType'] == 'HASH')
 
# JSONファイルの読み込み
with open(args.json_file, 'r', encoding='utf-8') as f:
    json_data = json.load(f)
 
# 各アイテムをDynamoDBテーブルに挿入し、パーティションキーのみ表示
for item in json_data:
    response = dynamodb.put_item(
        TableName=table_name,
        Item=item
    )
    # パーティションキーのみ表示
    partition_key_value = item[partition_key]['S'# 文字列タイプのキー('S')の場合
    print(f"Inserted item with {partition_key}: {partition_key_value}")

DBのテーブルから別テーブルにアイテムをコピー

ファイル名

copy_all_items.py

実行例

python3 copy_all_items.py source_table destination_table

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import boto3
import argparse
 
# コマンドライン引数を解析
parser = argparse.ArgumentParser(description='DynamoDBテーブルからテーブルへデータをコピーするスクリプト')
parser.add_argument('source_table', type=str, help='コピー元のDynamoDBテーブル名')
parser.add_argument('destination_table', type=str, help='コピー先のDynamoDBテーブル名')
args = parser.parse_args()
 
# DynamoDB クライアントの初期化
dynamodb = boto3.resource('dynamodb')
 
# コピー元テーブル(Aテーブル)
source_table = dynamodb.Table(args.source_table)
 
# コピー先テーブル(Bテーブル)
destination_table = dynamodb.Table(args.destination_table)
 
# ソーステーブルの全てのアイテムを取得
response = source_table.scan()
items = response['Items']
 
# 取得したアイテムをコピー先テーブルに挿入
with destination_table.batch_writer() as batch:
    for item in items:
        batch.put_item(Item=item)
        print(f"Copied item: {item}")
 
# ページネーション対応(アイテムが多い場合)
while 'LastEvaluatedKey' in response:
    response = source_table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    items = response['Items']
    with destination_table.batch_writer() as batch:
        for item in items:
            batch.put_item(Item=item)
            print(f"Copied item: {item}")

DynamoDBのテーブル名を取得するプログラム

ファイル名

get_db_table_names.py

実行例

python3 get_db_table_names.py

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
import boto3
 
# DynamoDB クライアントの初期化
dynamodb = boto3.client('dynamodb')
 
# 全テーブル名を取得
response = dynamodb.list_tables()
 
# テーブル名をソート
table_names = sorted(response['TableNames'])
 
# ソートされたテーブル名を出力
print("DynamoDB テーブル一覧(名前順):")
for table_name in table_names:
    print(table_name)