記事の目的
DynamoDBをcloudshellからでも、ローカルからでも自由自在に扱うためのプログラム例集
対象者
DynamoDBをPythonから満遍なく操作したい人
テーブルアイテム削除プログラム
ファイル名
delete_all_table_items.py
実行例:
python3 delete_all_table_items.py Template-hogehoge
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | import boto3 import argparse # コマンドライン引数を解析 parser = argparse.ArgumentParser(description = 'DynamoDBテーブルの全アイテムを削除するスクリプト' ) parser.add_argument( 'table_name' , type = str , help = '削除対象のDynamoDBテーブル名' ) args = parser.parse_args() # DynamoDBクライアントの初期化 dynamodb = boto3.resource( 'dynamodb' ) # 削除対象のテーブル table = dynamodb.Table(args.table_name) # テーブル内の全てのアイテムを取得 response = table.scan() items = response[ 'Items' ] # テーブルのスキーマ(パーティションキーとソートキーの情報)を取得 key_schema = table.key_schema partition_key = key_schema[ 0 ][ 'AttributeName' ] sort_key = key_schema[ 1 ][ 'AttributeName' ] if len (key_schema) > 1 else None # 取得したアイテムを削除 for item in items: # パーティションキーの値を取得 key = {partition_key: item[partition_key]} # ソートキーがある場合はソートキーも追加 if sort_key: key[sort_key] = item[sort_key] # アイテムを削除 table.delete_item(Key = key) print (f "Deleted item with {partition_key}: {item[partition_key]}" ) # ページネーション対応(アイテムが多い場合) while 'LastEvaluatedKey' in response: response = table.scan(ExclusiveStartKey = response[ 'LastEvaluatedKey' ]) items = response[ 'Items' ] for item in items: key = {partition_key: item[partition_key]} if sort_key: key[sort_key] = item[sort_key] table.delete_item(Key = key) print (f "Deleted item with {partition_key}: {item[partition_key]}" ) |
テーブルのスキーマ取得
ファイル名
getTableSchema.sh
実行例
python3 getTableSchema.sh
1 2 3 4 | for table in $(aws dynamodb list-tables --query "TableNames[]" --output text); do echo "Table: $table" aws dynamodb describe-table --table-name $table --query "Table.KeySchema" --output json done |
テーブルのアイテム数取得プログラム
実行例
python3 getDBItemLength.sh
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 | import boto3 import argparse # コマンドライン引数の設定 parser = argparse.ArgumentParser(description = 'Get item count from DynamoDB table.' ) parser.add_argument( 'table_name' , type = str , help = 'DynamoDB table name' ) args = parser.parse_args() # DynamoDBクライアントの初期化 dynamodb = boto3.client( 'dynamodb' ) # テーブル名を引数から取得 table_name = args.table_name # テーブル情報の取得 response = dynamodb.describe_table(TableName = table_name) # アイテム数の取得 item_count = response[ 'Table' ][ 'ItemCount' ] print (f "The table {table_name} contains {item_count} items." ) |
アイテムをDynamoDBに追加するプログラム
ファイル名
insert_dynamodb_items.py
実行例
python3 insert_dynamodb_items.py User-hogehoge users.json
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | import boto3 import json import argparse # コマンドライン引数の設定 parser = argparse.ArgumentParser(description = 'Insert JSON data into DynamoDB table.' ) parser.add_argument( 'table_name' , type = str , help = 'DynamoDB table name' ) parser.add_argument( 'json_file' , type = str , help = 'Path to JSON data file' ) args = parser.parse_args() # DynamoDBクライアントの初期化 dynamodb = boto3.client( 'dynamodb' ) resource = boto3.resource( 'dynamodb' ) # テーブル名を引数から取得 table_name = args.table_name table = resource.Table(table_name) # テーブルのスキーマ情報からパーティションキー名を取得 key_schema = table.key_schema partition_key = next (k[ 'AttributeName' ] for k in key_schema if k[ 'KeyType' ] = = 'HASH' ) # JSONファイルの読み込み with open (args.json_file, 'r' , encoding = 'utf-8' ) as f: json_data = json.load(f) # 各アイテムをDynamoDBテーブルに挿入し、パーティションキーのみ表示 for item in json_data: response = dynamodb.put_item( TableName = table_name, Item = item ) # パーティションキーのみ表示 partition_key_value = item[partition_key][ 'S' ] # 文字列タイプのキー('S')の場合 print (f "Inserted item with {partition_key}: {partition_key_value}" ) |
DBのテーブルから別テーブルにアイテムをコピー
ファイル名
copy_all_items.py
実行例
python3 copy_all_items.py source_table destination_table
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import boto3 import argparse # コマンドライン引数を解析 parser = argparse.ArgumentParser(description = 'DynamoDBテーブルからテーブルへデータをコピーするスクリプト' ) parser.add_argument( 'source_table' , type = str , help = 'コピー元のDynamoDBテーブル名' ) parser.add_argument( 'destination_table' , type = str , help = 'コピー先のDynamoDBテーブル名' ) args = parser.parse_args() # DynamoDB クライアントの初期化 dynamodb = boto3.resource( 'dynamodb' ) # コピー元テーブル(Aテーブル) source_table = dynamodb.Table(args.source_table) # コピー先テーブル(Bテーブル) destination_table = dynamodb.Table(args.destination_table) # ソーステーブルの全てのアイテムを取得 response = source_table.scan() items = response[ 'Items' ] # 取得したアイテムをコピー先テーブルに挿入 with destination_table.batch_writer() as batch: for item in items: batch.put_item(Item = item) print (f "Copied item: {item}" ) # ページネーション対応(アイテムが多い場合) while 'LastEvaluatedKey' in response: response = source_table.scan(ExclusiveStartKey = response[ 'LastEvaluatedKey' ]) items = response[ 'Items' ] with destination_table.batch_writer() as batch: for item in items: batch.put_item(Item = item) print (f "Copied item: {item}" ) |
DynamoDBのテーブル名を取得するプログラム
ファイル名
get_db_table_names.py
実行例
python3 get_db_table_names.py
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 | import boto3 # DynamoDB クライアントの初期化 dynamodb = boto3.client( 'dynamodb' ) # 全テーブル名を取得 response = dynamodb.list_tables() # テーブル名をソート table_names = sorted (response[ 'TableNames' ]) # ソートされたテーブル名を出力 print ( "DynamoDB テーブル一覧(名前順):" ) for table_name in table_names: print (table_name) |