記事の目的
DynamoDBをcloudshellからでも、ローカルからでも自由自在に扱うためのプログラム例集
対象者
DynamoDBをPythonから満遍なく操作したい人
テーブルアイテム削除プログラム
ファイル名
delete_all_table_items.py
実行例:
python3 delete_all_table_items.py Template-hogehoge
import boto3
import argparse
# コマンドライン引数を解析
parser = argparse.ArgumentParser(description='DynamoDBテーブルの全アイテムを削除するスクリプト')
parser.add_argument('table_name', type=str, help='削除対象のDynamoDBテーブル名')
args = parser.parse_args()
# DynamoDBクライアントの初期化
dynamodb = boto3.resource('dynamodb')
# 削除対象のテーブル
table = dynamodb.Table(args.table_name)
# テーブル内の全てのアイテムを取得
response = table.scan()
items = response['Items']
# テーブルのスキーマ(パーティションキーとソートキーの情報)を取得
key_schema = table.key_schema
partition_key = key_schema[0]['AttributeName']
sort_key = key_schema[1]['AttributeName'] if len(key_schema) > 1 else None
# 取得したアイテムを削除
for item in items:
# パーティションキーの値を取得
key = {partition_key: item[partition_key]}
# ソートキーがある場合はソートキーも追加
if sort_key:
key[sort_key] = item[sort_key]
# アイテムを削除
table.delete_item(Key=key)
print(f"Deleted item with {partition_key}: {item[partition_key]}")
# ページネーション対応(アイテムが多い場合)
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
items = response['Items']
for item in items:
key = {partition_key: item[partition_key]}
if sort_key:
key[sort_key] = item[sort_key]
table.delete_item(Key=key)
print(f"Deleted item with {partition_key}: {item[partition_key]}")
テーブルのスキーマ取得
ファイル名
getTableSchema.sh
実行例
python3 getTableSchema.sh
for table in $(aws dynamodb list-tables --query "TableNames[]" --output text); do echo "Table: $table" aws dynamodb describe-table --table-name $table --query "Table.KeySchema" --output json done
テーブルのアイテム数取得プログラム
実行例
python3 getDBItemLength.sh
import boto3
import argparse
# コマンドライン引数の設定
parser = argparse.ArgumentParser(description='Get item count from DynamoDB table.')
parser.add_argument('table_name', type=str, help='DynamoDB table name')
args = parser.parse_args()
# DynamoDBクライアントの初期化
dynamodb = boto3.client('dynamodb')
# テーブル名を引数から取得
table_name = args.table_name
# テーブル情報の取得
response = dynamodb.describe_table(TableName=table_name)
# アイテム数の取得
item_count = response['Table']['ItemCount']
print(f"The table {table_name} contains {item_count} items.")
アイテムをDynamoDBに追加するプログラム
ファイル名
insert_dynamodb_items.py
実行例
python3 insert_dynamodb_items.py User-hogehoge users.json
import boto3
import json
import argparse
# コマンドライン引数の設定
parser = argparse.ArgumentParser(description='Insert JSON data into DynamoDB table.')
parser.add_argument('table_name', type=str, help='DynamoDB table name')
parser.add_argument('json_file', type=str, help='Path to JSON data file')
args = parser.parse_args()
# DynamoDBクライアントの初期化
dynamodb = boto3.client('dynamodb')
resource = boto3.resource('dynamodb')
# テーブル名を引数から取得
table_name = args.table_name
table = resource.Table(table_name)
# テーブルのスキーマ情報からパーティションキー名を取得
key_schema = table.key_schema
partition_key = next(k['AttributeName'] for k in key_schema if k['KeyType'] == 'HASH')
# JSONファイルの読み込み
with open(args.json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# 各アイテムをDynamoDBテーブルに挿入し、パーティションキーのみ表示
for item in json_data:
response = dynamodb.put_item(
TableName=table_name,
Item=item
)
# パーティションキーのみ表示
partition_key_value = item[partition_key]['S'] # 文字列タイプのキー('S')の場合
print(f"Inserted item with {partition_key}: {partition_key_value}")
DBのテーブルから別テーブルにアイテムをコピー
ファイル名
copy_all_items.py
実行例
python3 copy_all_items.py source_table destination_table
import boto3
import argparse
# コマンドライン引数を解析
parser = argparse.ArgumentParser(description='DynamoDBテーブルからテーブルへデータをコピーするスクリプト')
parser.add_argument('source_table', type=str, help='コピー元のDynamoDBテーブル名')
parser.add_argument('destination_table', type=str, help='コピー先のDynamoDBテーブル名')
args = parser.parse_args()
# DynamoDB クライアントの初期化
dynamodb = boto3.resource('dynamodb')
# コピー元テーブル(Aテーブル)
source_table = dynamodb.Table(args.source_table)
# コピー先テーブル(Bテーブル)
destination_table = dynamodb.Table(args.destination_table)
# ソーステーブルの全てのアイテムを取得
response = source_table.scan()
items = response['Items']
# 取得したアイテムをコピー先テーブルに挿入
with destination_table.batch_writer() as batch:
for item in items:
batch.put_item(Item=item)
print(f"Copied item: {item}")
# ページネーション対応(アイテムが多い場合)
while 'LastEvaluatedKey' in response:
response = source_table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
items = response['Items']
with destination_table.batch_writer() as batch:
for item in items:
batch.put_item(Item=item)
print(f"Copied item: {item}")
DynamoDBのテーブル名を取得するプログラム
ファイル名
get_db_table_names.py
実行例
python3 get_db_table_names.py
import boto3
# DynamoDB クライアントの初期化
dynamodb = boto3.client('dynamodb')
# 全テーブル名を取得
response = dynamodb.list_tables()
# テーブル名をソート
table_names = sorted(response['TableNames'])
# ソートされたテーブル名を出力
print("DynamoDB テーブル一覧(名前順):")
for table_name in table_names:
print(table_name)