記事の目的

DynamoDBをcloudshellからでも、ローカルからでも自由自在に扱うためのプログラム例集

対象者

DynamoDBをPythonから満遍なく操作したい人

テーブルアイテム削除プログラム

ファイル名

delete_all_table_items.py

実行例:

python3 delete_all_table_items.py Template-hogehoge

import boto3
import argparse

# コマンドライン引数を解析
parser = argparse.ArgumentParser(description='DynamoDBテーブルの全アイテムを削除するスクリプト')
parser.add_argument('table_name', type=str, help='削除対象のDynamoDBテーブル名')
args = parser.parse_args()

# DynamoDBクライアントの初期化
dynamodb = boto3.resource('dynamodb')

# 削除対象のテーブル
table = dynamodb.Table(args.table_name)

# テーブル内の全てのアイテムを取得
response = table.scan()
items = response['Items']

# テーブルのスキーマ(パーティションキーとソートキーの情報)を取得
key_schema = table.key_schema
partition_key = key_schema[0]['AttributeName']
sort_key = key_schema[1]['AttributeName'] if len(key_schema) > 1 else None

# 取得したアイテムを削除
for item in items:
    # パーティションキーの値を取得
    key = {partition_key: item[partition_key]}

    # ソートキーがある場合はソートキーも追加
    if sort_key:
        key[sort_key] = item[sort_key]

    # アイテムを削除
    table.delete_item(Key=key)
    print(f"Deleted item with {partition_key}: {item[partition_key]}")

# ページネーション対応(アイテムが多い場合)
while 'LastEvaluatedKey' in response:
    response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    items = response['Items']
    for item in items:
        key = {partition_key: item[partition_key]}
        if sort_key:
            key[sort_key] = item[sort_key]
        table.delete_item(Key=key)
        print(f"Deleted item with {partition_key}: {item[partition_key]}")

テーブルのスキーマ取得

ファイル名

getTableSchema.sh

実行例

python3 getTableSchema.sh

for table in $(aws dynamodb list-tables --query "TableNames[]" --output text); do
  echo "Table: $table"
  aws dynamodb describe-table --table-name $table --query "Table.KeySchema" --output json
done

テーブルのアイテム数取得プログラム

実行例

python3 getDBItemLength.sh

import boto3
import argparse

# コマンドライン引数の設定
parser = argparse.ArgumentParser(description='Get item count from DynamoDB table.')
parser.add_argument('table_name', type=str, help='DynamoDB table name')
args = parser.parse_args()

# DynamoDBクライアントの初期化
dynamodb = boto3.client('dynamodb')

# テーブル名を引数から取得
table_name = args.table_name

# テーブル情報の取得
response = dynamodb.describe_table(TableName=table_name)

# アイテム数の取得
item_count = response['Table']['ItemCount']

print(f"The table {table_name} contains {item_count} items.")

アイテムをDynamoDBに追加するプログラム

ファイル名

insert_dynamodb_items.py

実行例

python3 insert_dynamodb_items.py User-hogehoge users.json

import boto3
import json
import argparse

# コマンドライン引数の設定
parser = argparse.ArgumentParser(description='Insert JSON data into DynamoDB table.')
parser.add_argument('table_name', type=str, help='DynamoDB table name')
parser.add_argument('json_file', type=str, help='Path to JSON data file')
args = parser.parse_args()

# DynamoDBクライアントの初期化
dynamodb = boto3.client('dynamodb')
resource = boto3.resource('dynamodb')

# テーブル名を引数から取得
table_name = args.table_name
table = resource.Table(table_name)

# テーブルのスキーマ情報からパーティションキー名を取得
key_schema = table.key_schema
partition_key = next(k['AttributeName'] for k in key_schema if k['KeyType'] == 'HASH')

# JSONファイルの読み込み
with open(args.json_file, 'r', encoding='utf-8') as f:
    json_data = json.load(f)

# 各アイテムをDynamoDBテーブルに挿入し、パーティションキーのみ表示
for item in json_data:
    response = dynamodb.put_item(
        TableName=table_name,
        Item=item
    )
    # パーティションキーのみ表示
    partition_key_value = item[partition_key]['S']  # 文字列タイプのキー('S')の場合
    print(f"Inserted item with {partition_key}: {partition_key_value}")

DBのテーブルから別テーブルにアイテムをコピー

ファイル名

copy_all_items.py

実行例

python3 copy_all_items.py source_table destination_table

import boto3
import argparse

# コマンドライン引数を解析
parser = argparse.ArgumentParser(description='DynamoDBテーブルからテーブルへデータをコピーするスクリプト')
parser.add_argument('source_table', type=str, help='コピー元のDynamoDBテーブル名')
parser.add_argument('destination_table', type=str, help='コピー先のDynamoDBテーブル名')
args = parser.parse_args()

# DynamoDB クライアントの初期化
dynamodb = boto3.resource('dynamodb')

# コピー元テーブル(Aテーブル)
source_table = dynamodb.Table(args.source_table)

# コピー先テーブル(Bテーブル)
destination_table = dynamodb.Table(args.destination_table)

# ソーステーブルの全てのアイテムを取得
response = source_table.scan()
items = response['Items']

# 取得したアイテムをコピー先テーブルに挿入
with destination_table.batch_writer() as batch:
    for item in items:
        batch.put_item(Item=item)
        print(f"Copied item: {item}")

# ページネーション対応(アイテムが多い場合)
while 'LastEvaluatedKey' in response:
    response = source_table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    items = response['Items']
    with destination_table.batch_writer() as batch:
        for item in items:
            batch.put_item(Item=item)
            print(f"Copied item: {item}")

DynamoDBのテーブル名を取得するプログラム

ファイル名

get_db_table_names.py

実行例

python3 get_db_table_names.py

import boto3

# DynamoDB クライアントの初期化
dynamodb = boto3.client('dynamodb')

# 全テーブル名を取得
response = dynamodb.list_tables()

# テーブル名をソート
table_names = sorted(response['TableNames'])

# ソートされたテーブル名を出力
print("DynamoDB テーブル一覧(名前順):")
for table_name in table_names:
    print(table_name)