記事の目的
DynamoDBをcloudshellからでも、ローカルからでも自由自在に扱うためのプログラム例集
対象者
DynamoDBをPythonから満遍なく操作したい人
テーブルアイテム削除プログラム
ファイル名
delete_all_table_items.py
実行例:
python3 delete_all_table_items.py Template-hogehoge
import boto3 import argparse # コマンドライン引数を解析 parser = argparse.ArgumentParser(description='DynamoDBテーブルの全アイテムを削除するスクリプト') parser.add_argument('table_name', type=str, help='削除対象のDynamoDBテーブル名') args = parser.parse_args() # DynamoDBクライアントの初期化 dynamodb = boto3.resource('dynamodb') # 削除対象のテーブル table = dynamodb.Table(args.table_name) # テーブル内の全てのアイテムを取得 response = table.scan() items = response['Items'] # テーブルのスキーマ(パーティションキーとソートキーの情報)を取得 key_schema = table.key_schema partition_key = key_schema[0]['AttributeName'] sort_key = key_schema[1]['AttributeName'] if len(key_schema) > 1 else None # 取得したアイテムを削除 for item in items: # パーティションキーの値を取得 key = {partition_key: item[partition_key]} # ソートキーがある場合はソートキーも追加 if sort_key: key[sort_key] = item[sort_key] # アイテムを削除 table.delete_item(Key=key) print(f"Deleted item with {partition_key}: {item[partition_key]}") # ページネーション対応(アイテムが多い場合) while 'LastEvaluatedKey' in response: response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) items = response['Items'] for item in items: key = {partition_key: item[partition_key]} if sort_key: key[sort_key] = item[sort_key] table.delete_item(Key=key) print(f"Deleted item with {partition_key}: {item[partition_key]}")
テーブルのスキーマ取得
ファイル名
getTableSchema.sh
実行例
python3 getTableSchema.sh
for table in $(aws dynamodb list-tables --query "TableNames[]" --output text); do echo "Table: $table" aws dynamodb describe-table --table-name $table --query "Table.KeySchema" --output json done
テーブルのアイテム数取得プログラム
実行例
python3 getDBItemLength.sh
import boto3 import argparse # コマンドライン引数の設定 parser = argparse.ArgumentParser(description='Get item count from DynamoDB table.') parser.add_argument('table_name', type=str, help='DynamoDB table name') args = parser.parse_args() # DynamoDBクライアントの初期化 dynamodb = boto3.client('dynamodb') # テーブル名を引数から取得 table_name = args.table_name # テーブル情報の取得 response = dynamodb.describe_table(TableName=table_name) # アイテム数の取得 item_count = response['Table']['ItemCount'] print(f"The table {table_name} contains {item_count} items.")
アイテムをDynamoDBに追加するプログラム
ファイル名
insert_dynamodb_items.py
実行例
python3 insert_dynamodb_items.py User-hogehoge users.json
import boto3 import json import argparse # コマンドライン引数の設定 parser = argparse.ArgumentParser(description='Insert JSON data into DynamoDB table.') parser.add_argument('table_name', type=str, help='DynamoDB table name') parser.add_argument('json_file', type=str, help='Path to JSON data file') args = parser.parse_args() # DynamoDBクライアントの初期化 dynamodb = boto3.client('dynamodb') resource = boto3.resource('dynamodb') # テーブル名を引数から取得 table_name = args.table_name table = resource.Table(table_name) # テーブルのスキーマ情報からパーティションキー名を取得 key_schema = table.key_schema partition_key = next(k['AttributeName'] for k in key_schema if k['KeyType'] == 'HASH') # JSONファイルの読み込み with open(args.json_file, 'r', encoding='utf-8') as f: json_data = json.load(f) # 各アイテムをDynamoDBテーブルに挿入し、パーティションキーのみ表示 for item in json_data: response = dynamodb.put_item( TableName=table_name, Item=item ) # パーティションキーのみ表示 partition_key_value = item[partition_key]['S'] # 文字列タイプのキー('S')の場合 print(f"Inserted item with {partition_key}: {partition_key_value}")
DBのテーブルから別テーブルにアイテムをコピー
ファイル名
copy_all_items.py
実行例
python3 copy_all_items.py source_table destination_table
import boto3 import argparse # コマンドライン引数を解析 parser = argparse.ArgumentParser(description='DynamoDBテーブルからテーブルへデータをコピーするスクリプト') parser.add_argument('source_table', type=str, help='コピー元のDynamoDBテーブル名') parser.add_argument('destination_table', type=str, help='コピー先のDynamoDBテーブル名') args = parser.parse_args() # DynamoDB クライアントの初期化 dynamodb = boto3.resource('dynamodb') # コピー元テーブル(Aテーブル) source_table = dynamodb.Table(args.source_table) # コピー先テーブル(Bテーブル) destination_table = dynamodb.Table(args.destination_table) # ソーステーブルの全てのアイテムを取得 response = source_table.scan() items = response['Items'] # 取得したアイテムをコピー先テーブルに挿入 with destination_table.batch_writer() as batch: for item in items: batch.put_item(Item=item) print(f"Copied item: {item}") # ページネーション対応(アイテムが多い場合) while 'LastEvaluatedKey' in response: response = source_table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) items = response['Items'] with destination_table.batch_writer() as batch: for item in items: batch.put_item(Item=item) print(f"Copied item: {item}")
DynamoDBのテーブル名を取得するプログラム
ファイル名
get_db_table_names.py
実行例
python3 get_db_table_names.py
import boto3 # DynamoDB クライアントの初期化 dynamodb = boto3.client('dynamodb') # 全テーブル名を取得 response = dynamodb.list_tables() # テーブル名をソート table_names = sorted(response['TableNames']) # ソートされたテーブル名を出力 print("DynamoDB テーブル一覧(名前順):") for table_name in table_names: print(table_name)