pytorchとSageMakerのビルトインパターンでの機械学習モデルデプロイ比較

今回はSageMakerを使用する前提で、

その中の手法のうち2種類の手法の比較を行いたいと思います。

記事の目的

同じくSageMakerを使う上で、

アーキテクチャをある程度自ら作成するpytorchパターンSageMakerのビルトインパターンの2種類での比較

画像出典元:https://pages.awscloud.com/rs/112-TZM-766/images/AWSの機械学習サービスとAmazon SageMaker の基礎.pdf

対象者

機械学習モデルをある程度理解していて、クラウドリソースを用いてモデルの訓練、デプロイを単純化したり、運用を楽にしたいと考える人

この記事を読み終わるまでの時間

およそ5m

関連、参考元記事

以下に詳細記事2つを貼ります。この記事では比較に絞って記載していますので

この記事を読んで概要をつかんだ後に直下のリンク先でより詳細な情報を掴みにいくと望ましいと思います。

pytorchパターン
https://github.com/aws/amazon-sagemaker-examples/blob/b01821341caf3d6af351e852b0dd9955db0e4515/sagemaker-python-sdk/pytorch_mnist/pytorch_mnist.ipynb

ビルトインパターン
https://github.com/aws/amazon-sagemaker-examples/blob/main/introduction_to_amazon_algorithms/semantic_segmentation_pascalvoc/semantic_segmentation_pascalvoc.ipynb

以下部分箇所の比較

1:モデル生成箇所比較

pytorchパターン

クラス定義

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
<br /># Based on https://github.com/pytorch/examples/blob/master/mnist/main.py
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
 
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)

引数のminst.pyのmain関数で、このクラスを用いたtrain処理を呼び出すことで、estimatorとして生成している

01
02
03
04
05
06
07
08
09
10
11
from sagemaker.pytorch import PyTorch
 
estimator = PyTorch(entry_point='mnist.py',
role=role,
framework_version='1.1.0',
train_instance_count=2,
train_instance_type='ml.c4.xlarge',
hyperparameters={
'epochs': 6,
'backend': 'gloo'
})

ビルトインパターン

ビルトインアルゴリズムを使うための、コンテナイメージの取得

1
2
training_image = sagemaker.image_uris.retrieve("semantic-segmentation", sess.boto_region_name)
print(training_image)

コンテナイメージをEstimatorへ渡して生成

01
02
03
04
05
06
07
08
09
10
11
ss_estimator = sagemaker.estimator.Estimator(
training_image, # Container image URI
role, # Training job execution role with permissions to access our S3 bucket
instance_count=1,
instance_type="ml.p3.2xlarge",
volume_size=50, # in GB
max_run=360000, # in seconds
output_path=s3_output_location,
base_job_name="ss-notebook-demo",
sagemaker_session=sess,
)

ハイパーパラメータの設定

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
# Setup hyperparameters
ss_estimator.set_hyperparameters(
backbone="resnet-50", # This is the encoder. Other option is resnet-101
algorithm="fcn", # This is the decoder. Other options are 'psp' and 'deeplab'
use_pretrained_model="True", # Use the pre-trained model.
crop_size=240, # Size of image random crop.
num_classes=21, # Pascal has 21 classes. This is a mandatory parameter.
epochs=10, # Number of epochs to run.
learning_rate=0.0001,
optimizer="rmsprop", # Other options include 'adam', 'rmsprop', 'nag', 'adagrad'.
lr_scheduler="poly", # Other options include 'cosine' and 'step'.
mini_batch_size=16, # Setup some mini batch size.
validation_mini_batch_size=16,
early_stopping=True, # Turn on early stopping. If OFF, other early stopping parameters are ignored.
early_stopping_patience=2, # Tolerate these many epochs if the mIoU doens't increase.
early_stopping_min_epochs=10, # No matter what, run these many number of epochs.
num_training_samples=num_training_samples, # This is a mandatory parameter, 1464 in this case.
)

画像出典元:https://pages.awscloud.com/rs/112-TZM-766/images/AWSの機械学習サービスとAmazon SageMaker の基礎.pdf

2:訓練データ情報の引渡し方の違い

pytorchパターン

s3へデータアップロード

1
2
inputs = sagemaker_session.upload_data(path='data', bucket=bucket, key_prefix=prefix)
print('input spec (in this case, just an S3 path): {}'.format(inputs))

fit時にパス情報を引渡し

1
estimator.fit({'training': inputs})

ビルトインパターン

s3へのアップロード

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
%%time
train_channel = sess.upload_data(path="data/train", bucket=bucket, key_prefix=prefix + "/train")
print(train_channel)
train_annotation_channel = sess.upload_data(
path="data/train_annotation",
bucket=bucket,
key_prefix=prefix + "/train_annotation",
)
print(train_annotation_channel)
validation_channel = sess.upload_data(
path="data/validation", bucket=bucket, key_prefix=prefix + "/validation"
)
print(validation_channel)
validation_annotation_channel = sess.upload_data(
path="data/validation_annotation",
bucket=bucket,
key_prefix=prefix + "/validation_annotation",
)
print(validation_annotation_channel)

データチャンネルにパス情報を設定

余談:合わせてラベルマップによってラベルのカスタマイズ可能

01
02
03
04
05
06
07
08
09
10
11
12
13
distribution = "FullyReplicated"
 
data_channels = {
"train": sagemaker.inputs.TrainingInput(train_channel, distribution=distribution),
"validation": sagemaker.inputs.TrainingInput(validation_channel, distribution=distribution),
"train_annotation": sagemaker.inputs.TrainingInput(
train_annotation_channel, distribution=distribution
),
"validation_annotation": sagemaker.inputs.TrainingInput(
validation_annotation_channel, distribution=distribution
),
# 'label_map': label_map_channel
}

fit時にデータチャンネルを渡す

1
ss_estimator.fit(data_channels, logs=True)

3:デプロイは同一処理

pytorchパターン

1
predictor = estimator.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')

ビルトインパターン

1
ss_predictor = ss_estimator.deploy(initial_instance_count=1, instance_type="ml.c5.xlarge")

4:検証:Inference

pytorchパターン

検証

1
2
3
4
5
6
import numpy as np
 
image = np.array([data], dtype=np.float32)
response = predictor.predict(image)
prediction = response.argmax(axis=1)[0]
print(prediction)

ビルトインパターン

検証

1
2
3
4
5
6
7
8
9
%%time
 
cls_mask = ss_predictor.predict(imbytes)
 
print(type(cls_mask))
print(cls_mask.shape)
 
plt.imshow(cls_mask, cmap="jet")
plt.show()

※以降は比較というトピックからは少し話が逸れるので読み飛ばしても構わないが、

モデルの種類の関係上シリアライザーとデシリアライザーを加工するロジックを上記のpredictの前に実行することが望ましい

デシリアライザーの設定

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from PIL import Image
import numpy as np
 
class ImageDeserializer(sagemaker.deserializers.BaseDeserializer):
"""Deserialize a PIL-compatible stream of Image bytes into a numpy pixel array"""
 
def __init__(self, accept="image/png"):
self.accept = accept
 
@property
def ACCEPT(self):
return (self.accept,)
 
def deserialize(self, stream, content_type):
"""Read a stream of bytes returned from an inference endpoint.
Args:
stream (botocore.response.StreamingBody): A stream of bytes.
content_type (str): The MIME type of the data.
Returns:
mask: The numpy array of class labels per pixel
"""
try:
return np.array(Image.open(stream))
finally:
stream.close()
 
ss_predictor.deserializer = ImageDeserializer(accept="image/png")

シリアライザーの設定

1
2
3
4
5
6
ss_predictor.serializer = sagemaker.serializers.IdentitySerializer("image/jpeg")
 
with open(filename, "rb") as imfile:
imbytes = imfile.read()
 
# Extension exercise: Could you write a custom serializer which takes a filename as input instead?

5:エンドポイントの削除

pytorchパターン、ビルトインパターン共通

1
estimator.delete_endpoint()

まとめ

ここまでで、各機能の比較は終わりです

どちらの手法でもSageMakerによって、機械学習プロジェクトで起こりがちな

下記問題にアプローチできていることがわかります。

  • 環境構築が大変
  • 複数の学習ジョブを並列で実行するのが大変
  • 複数マシンを使った分散学習を実現するのが大変
  • 推論用の API サーバ構築とメンテが大変

画像出典元:https://pages.awscloud.com/rs/112-TZM-766/images/AWSの機械学習サービスとAmazon SageMaker の基礎.pdf

その上でpytorchやtensorflowなど、フレームワークのアップデート状況を加味しつつ、適切な選択を行うことが望ましいと思います。

Appendix:

少し詳細情報になるので、本題が終わったところで情報追加となります。

pytorchバージョンで実装を行う場合、軽く上記した通りメインのコードの加工を少々行う必要があります。

修正する上でのポイントは以下となります。

  • main関数の中で–batch-sizeなどのハイパーパラメータを受け取れるように記載する
  • main関数で同ファイル内のtrain関数を呼び出す
  • SageMakerのEstimatorに対応するために、以下の関数をファイル内に定義しておく必要がある
  1. model_fn(モデルの読み込み)
  2. input_fn(前処理)
  3. predict_fn(予測)
  4. output_fn(後処理)

※推論時は2→3→4の順で処理が実行される

以下引用コード:

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import argparse
import json
import logging
import os
import sagemaker_containers
import sys
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
import torch.utils.data.distributed
from torchvision import datasets, transforms
 
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
 
# Based on https://github.com/pytorch/examples/blob/master/mnist/main.py
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
 
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
 
def _get_train_data_loader(batch_size, training_dir, is_distributed, **kwargs):
logger.info("Get train data loader")
dataset = datasets.MNIST(training_dir, train=True, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None
return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=train_sampler is None,
sampler=train_sampler, **kwargs)
 
def _get_test_data_loader(test_batch_size, training_dir, **kwargs):
logger.info("Get test data loader")
return torch.utils.data.DataLoader(
datasets.MNIST(training_dir, train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=test_batch_size, shuffle=True, **kwargs)
 
def _average_gradients(model):
# Gradient averaging.
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
param.grad.data /= size
 
def train(args):
is_distributed = len(args.hosts) > 1 and args.backend is not None
logger.debug("Distributed training - {}".format(is_distributed))
use_cuda = args.num_gpus > 0
logger.debug("Number of gpus available - {}".format(args.num_gpus))
kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
device = torch.device("cuda" if use_cuda else "cpu")
 
if is_distributed:
# Initialize the distributed environment.
world_size = len(args.hosts)
os.environ['WORLD_SIZE'] = str(world_size)
host_rank = args.hosts.index(args.current_host)
os.environ['RANK'] = str(host_rank)
dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)
logger.info('Initialized the distributed environment: \'{}\' backend on {} nodes. '.format(
args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format(
dist.get_rank(), args.num_gpus))
 
# set the seed for generating random numbers
torch.manual_seed(args.seed)
if use_cuda:
torch.cuda.manual_seed(args.seed)
 
train_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs)
test_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs)
 
logger.debug("Processes {}/{} ({:.0f}%) of train data".format(
len(train_loader.sampler), len(train_loader.dataset),
100. * len(train_loader.sampler) / len(train_loader.dataset)
))
 
logger.debug("Processes {}/{} ({:.0f}%) of test data".format(
len(test_loader.sampler), len(test_loader.dataset),
100. * len(test_loader.sampler) / len(test_loader.dataset)
))
 
model = Net().to(device)
if is_distributed and use_cuda:
# multi-machine multi-gpu case
model = torch.nn.parallel.DistributedDataParallel(model)
else:
# single-machine multi-gpu case or single-machine or multi-machine cpu case
model = torch.nn.DataParallel(model)
 
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
 
for epoch in range(1, args.epochs + 1):
model.train()
for batch_idx, (data, target) in enumerate(train_loader, 1):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
if is_distributed and not use_cuda:
# average gradients manually for multi-machine cpu case only
_average_gradients(model)
optimizer.step()
if batch_idx % args.log_interval == 0:
logger.info('Train Epoch: {} [{}/{} ({:.0f}%)] Loss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.sampler),
100. * batch_idx / len(train_loader), loss.item()))
test(model, test_loader, device)
save_model(model, args.model_dir)
 
def test(model, test_loader, device):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
 
test_loss /= len(test_loader.dataset)
logger.info('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))
 
def model_fn(model_dir):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = torch.nn.DataParallel(Net())
with open(os.path.join(model_dir, 'model.pth'), 'rb') as f:
model.load_state_dict(torch.load(f))
return model.to(device)
 
def save_model(model, model_dir):
logger.info("Saving the model.")
path = os.path.join(model_dir, 'model.pth')
# recommended way from http://pytorch.org/docs/master/notes/serialization.html
torch.save(model.cpu().state_dict(), path)
 
if __name__ == '__main__':
parser = argparse.ArgumentParser()
 
# Data and model checkpoints directories
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=100, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--backend', type=str, default=None,
help='backend for distributed training (tcp, gloo on cpu and gloo, nccl on gpu)')
 
# Container environment
parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))
parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])
parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
parser.add_argument('--data-dir', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
parser.add_argument('--num-gpus', type=int, default=os.environ['SM_NUM_GPUS'])
 
train(parser.parse_args())

他関連資料:SageMakerの基礎資料

https://pages.awscloud.com/rs/112-TZM-766/images/Amazon SageMaker の基礎.pdf

共通点概要

  • S3に訓練データを格納し、並列トレーニングを実施している点
  • SageMakerによって、訓練、デプロイを行える点