はじめに

streampackチームのminsuです。
先日はステッパーを購入してとうとう室内で有酸素運動までできるようになり、接触10割減、ビタミンD未生成生活が捗っております。

今回は最近よく使っているサーバーレスアプリケーションの理解を深めるために API Gateway で Websocket API を構築して、よくあるチャット機能を実装してみます。
構成は API Gateway + Lambda + DynamoDB です。

API Gateway

作成

APIタイプを選択 >WebSocket API> 構築

ここでルート選択式を$request.body.actionに設定しています。
クライアントからリクエストを送った際に

{"action":"sendmessage","data":"hello"}

ルート選択式で設定した actionプロパティの値 sendmessageの動作を API Gateway で実行することができます。

新しいルートキーで sendmessageを追加します。
各ルートは次の用途で利用されます。

ルート 説明
$connect クライアントがWebSocket通信を開始する時に利用
$disconnect クライアントがWebSocket通信を終了する時に利用
sendmessage ルートキー sendmessage に合致時に利用
$default 該当する ルートキー が無い場合に利用

チャットAPIの処理

メッセージを送ると WebSocket 接続中のクライアント全体にメッセージが届くAPIを実装してみます。

ルート 処理
$connect クライアントごとの API Gateway の connectionID を dynamoDB に保存する
sendmessage DynamoDB から API Gateway の connectionID 一覧を取得して messageとして送られてきた文字を WebSocket 接続中のクライアント全体に送る
$disconnect クライアントの connectionID を dynamoDB から削除する
$default 利用しない

DynamoDB

パーティションキー : connection_id

*実際はチャットルームを分けたいとおもうので
バーティションキー : room_id
ソートキー : connection_id
のようになるかと思います。

Lambda

DynamoDB の作成がおわったので、API Gateway 各ルートから呼び出す Lambda 関数を作成していきます。

ランタイム : Node.js 12.x
Lambda のロールに AmazonAPIGatewayInvokeFullAccess ポリシーと下記ポリシーを付与しました。

OnConnect

OnConnect

const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient();

exports.handler = (event, context, callback) => {
  const TABLE_NAME = 'My DynamoDB';
  const params = {
    TableName: TABLE_NAME,
    Item: {
      'connection_id': event.requestContext.connectionId
    }
  };

  docClient.put(params, (err) => {
    if (err) {
      console.log("Error", err);
      return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
    } else {
      console.log("success OnConnect");
    }
  });
  const response = {
      statusCode: 200,
      body: JSON.stringify(""),
  };

  callback(null,response);
};

OnDisconnect

OnConnect とは逆に DynamoDB から connectionId を削除します。

OnDisconnect

const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient();

exports.handler = (event, context, callback) => {
  const TABLE_NAME = 'My DynamoDB';
  const params = {
    TableName: TABLE_NAME,
    Key: {
      'connection_id': event.requestContext.connectionId
    }
  };

  docClient.delete(params, (err) => {
    if (err) {
      console.log("Error", err);
      return { statusCode: 500, body: 'Failed to disconnect: ' + JSON.stringify(err) };
    } else {
      console.log("success OnDisconnect");
    }
  });
  const response = {
      statusCode: 200,
      body: JSON.stringify(""),
  };

  callback(null,response);
};

SendMessage

DynamoDB を scan して connection_id 一覧を取得し、
全接続先に対して message を送信します。

SendMessage

const AWS = require("aws-sdk");
const docClient = new AWS.DynamoDB.DocumentClient();
exports.handler = async (event, context, callback) => {
    const TABLE_NAME = 'My DynamoDB';
    console.log("SendMessage");

    const agma = new AWS.ApiGatewayManagementApi({
        apiVersion: "2018-11-29",
        endpoint: event.requestContext.domainName + "/" + event.requestContext.stage
    });

    const params = {
        TableName: TABLE_NAME,
    };

    docClient.scan(params, function(err,data){
        if (err) {
            console.log("Error", err);
            return { statusCode: 500, body: 'Failed to scan: ' + JSON.stringify(err) };
        } else {
            let postParams = {
                Data : JSON.parse(event.body).data
            };
            console.log(data);
            data.Items.forEach((element) => {
                postParams.ConnectionId = element.connection_id;
                agma.postToConnection(postParams,(err) => {
                    if (err) {
                        console.log("Error", err);
                        return { statusCode: 500, body: 'Faild to Post: ' + JSON.stringify(err) };
                    }
                });
            });
        }
    });
    console.log("sendmessage");
    const response = {
        statusCode: 200,
        body: JSON.stringify(""),
    };

    callback(null,response);
};

API Gateway デプロイ

再び API Gateway に戻り、各ルートに Lambda関数呼び出しを設定します。
統合タイプ : Lambda関数
Lambdaプロキシ統合の利用 : true

ルート Lambda関数 備考
$connect OnConnect
$disconnect OnDisconnect
sendmessage SendMessage
$default OnConnect 未設定だとデプロイできないので

アクション > APIのデプロイ

これにより、次のようなurlでwebsocket接続が可能になります。

wss://xxxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev

接続テスト

wscat がおすすめのようなので利用します。
npm install

$ npm install -g wscat
$ wscat -c wss:///xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev
connected (press CTRL+C to quit)
> { "action": "sendmessage", "data": "message" }
<message

複数接続後sendmessageして、全接続先にmessageが届くことを確認。

WebSocket 接続

画面作成の動きはだいたい下記の通りに作成しました。
cnclose時にWebSocket再接続させたいのですがもっと良い方法等あれば教えていただきたいです。

  const uri = "wss://xxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev";
  let ws;

  function startWebsocket(){
    ws = new WebSocket(uri);
    ws.onopen = function(e) {
      // WebSocket接続
      // ルート $onconnect
    };
    ws.onmessage = function(e) {
      // WebSocket接続先からデータ受け取り
      // いずれかのクライアントが sendMessage すると呼び出される
    };
    ws.onclose = function(e) {
      // WebSocket接続終了
      // ルート $disconnect
      setTimeout(function() {
        startWebsocket();
      }, 500);
    };
    ws.onerror = function(e) {
      // 
    };
  }

  function sendMessage(message) {
    ws.send(`{ "action": "sendmessage", "data": "${message}"}`);
    // ルート sendmessage
  }

send でリクエスト送信
onmessage で受け取り
することでチャット機能を実装できました。

テスト

liveプレイヤーにオーバーレイして動画サイトの生放送風にしてみました。
複数ページを開いてもmessageを受け取れていることが確認できました。

利用動画 : (c)copyright 2008、Blender Foundation / www.bigbuckbunny.org

まとめ

一通り実装することができました。
サーバーレスでの WebSocket 接続はできることも多そうなので使う機会があれば活かしていきたいと思います。

参考

https://github.com/aws-samples/simple-websockets-chat-app
https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/websocket-api-develop.html

元記事はこちら

API Gateway で WebSocket を利用したAPI作成