やりたいこと

Boto3 を利用して以下のような DynamoDB の各種操作を行いたい。

  • テーブル作成
  • テーブル一覧を取得
  • テーブルにデータを追加(put_item
  • テーブルからデータを全件取得(scan
  • テーブルのデータを取得(query)
  • テーブルのデータを更新(update_item

準備

試した環境

試した環境は以下の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.11.6
BuildVersion:   15G1004

$ python --version
Python 2.7.11

DynamoDB Local on Docker

DynamoDB Local を以下のような Dockerfile で Docker コンテナで動かしておく。

FROM java:openjdk-8
RUN mkdir /usr/local/ddlocal
WORKDIR /usr/local/ddlocal
RUN \
  wget http://dynamodb-local.s3-website-us-west-2.amazonaws.com/dynamodb_local_latest.tar.gz && \
  tar zxvf dynamodb_local_latest.tar.gz && \
  rm -f dynamodb_local_latest.tar.gz && \
  mkdir db
EXPOSE 8080
CMD java -Djava.library.path=DynamoDBLocal_lib -jar DynamoDBLocal.jar -port 8080 -dbPath ./db

以下のように docker run する。

$ docker built -t dynamodb-local
$ docker run --name=dynamo -p 8080:8080 -d dynamodb-local

一応、起動を確認。

bash-3.2$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                    NAMES
223d205d6513        dynamodb-local      "/bin/sh -c 'java -Dj"   5 days ago          Up 8 seconds        0.0.0.0:8080->8080/tcp   dynamo

Boto3 をインストール

$ cat requirements.txt
boto3
$ pip install -r requirements.txt

sample スクリプト

Client と Resource

上記のドキュメントを読むと DynamoDB を操作するあたり、以下のような機能(インターフェース)が提供されている。

機能 クラス名 概要
Client DynamoDB.Client 低レベルな操作が可能なインターフェース
Paginators DynamoDB.Paginator.* 自動的なページングを提供
Waiters DynamoDB.Waiter.* 特定の状態に達するまでのブロックを提供
Service Resource DynamoDB.ServiceResource 高レベルのオブジェクト指向インタフェース
Table DynamoDB.Table テーブル操作

当初は Client インターフェースを使って、DynamoDB に接続していたけど、色々といじっているうちに Service Resource の方が直感的に書ける気がしてきたので、以下の sample スクリプトでは Service Resource を使うことにした。(テーブル操作だけは Table インターフェースを利用したけど。)

sample スクリプト

# -*- coding: utf-8 -*-

import boto3
from boto3.dynamodb.conditions import Key, Attr

'''
Docker コンテナで DynamoDB Local を立ち上げておく
 - _DYNAMO_HOST: DynamoDB Local のホスト名
 - _DYNAMO_PORT: DynamoDB Local のポート番号
'''
_DYNAMO_HOST = 'xxx.xxx.xxx.xxx'
_DYNAMO_PORT = 'xxxx'

class DynamoSample:

    def __init__(self):
        self.dynamodb = boto3.resource(
             'dynamodb',
             region_name = 'ap-northeast-1',
             endpoint_url = 'http://' + _DYNAMO_HOST + ':' + _DYNAMO_PORT)

    def create_table(self, table_name):
        try:
            res = self.dynamodb.create_table(
                AttributeDefinitions = [
                    {
                        'AttributeName': 'name',
                        'AttributeType': 'S'
                    },
                ],
                TableName = table_name,
                KeySchema = [
                    {
                        'AttributeName': 'name',
                        'KeyType': 'HASH'
                    },
                ],
                ProvisionedThroughput = {
                    'ReadCapacityUnits': 1,
                    'WriteCapacityUnits': 1
                }
            )
            return res
        except Exception, e:
            return e


    def tables(self):
        try:
            res = self.dynamodb.tables.all()
            return res
        except Exception, e:
            return e

    def put_item(self, table_name):
        table = self.dynamodb.Table(table_name)
        try:
            res = table.put_item(
                 Item = { 
                     'name': 'foo',
                     'flag': True
                 }
            )
            return res
        except Exception, e:
            return e

    def scan(self, table_name):
        table = self.dynamodb.Table(table_name)
        try:
            res = table.scan()
            return res
        except Exception, e:
            return e

    def query(self, table_name, key):
        table = self.dynamodb.Table(table_name)
        try:
            res = table.query(
                 KeyConditionExpression = Key('name').eq(key)
            )
            return res
        except Exception, e:
            return e

    def update_item(self, table_name, key):
        table = self.dynamodb.Table(table_name)
        try:
            res = table.update_item(
                Key = {
                     'name': key
                },
                AttributeUpdates = {
                     'flag':{
                         'Action': 'PUT',
                         'Value': False
                     }
                },
                ReturnValues = 'UPDATED_NEW'
            )
            return self.query(table_name, key)
        except Exception, e:
            return e

'''
 色々と試してみる
'''
dynamo = DynamoSample()

# テーブル作成
# print dynamo.create_table('hoge')

# テーブル一覧を取得
print '*** テーブル一覧を取得 ***'
for table in dynamo.tables():
    print table.table_name
print ''

# 項目を追加する
print '*** 項目を追加する ***'
print dynamo.put_item('foo')['ResponseMetadata']['HTTPStatusCode']
print ''

# テーブルを scan する
print '*** テーブルを scan する ***'
for item in dynamo.scan('foo')['Items']:
    print item
print ''

# 項目を取得する
print '*** 項目を取得する ***'
for item in  dynamo.query('foo', 'foo')['Items']:
    print item
print ''

# 項目を更新する
print '*** 項目を更新する ***'
for item in dynamo.update_item('foo', 'foo')['Items']:
    print item
print ''

output

$ python demo.py
*** テーブル一覧を取得 ***
foo
hoge

*** 項目を追加する ***
200

*** テーブルを scan する ***
{u'flag': True, u'name': u'foo'}

*** 項目を取得する ***
{u'flag': True, u'name': u'foo'}

*** 項目を更新する ***
{u'flag': False, u'name': u'foo'}

以上

メモでした。

Python コーディング規約に準拠していない部分があったりするので、せめてそのあたりは準拠して書けるようになりたいと思う今日この頃。

元記事はこちら

Boto3 から DynamoDB の各種操作メモ(テーブル一覧取得、データ追加、データ取得、データ更新)