Momentoでリアルタイムチャットアプリケーションを構築する方法:ステップバイステップガイド

Yan Cui

リアルタイムアプリケーションはますます普及しており、ほとんどの人がWhatsAppのようなリアルタイムチャットアプリケーションを1つ以上使っています。

このようなアプリケーションの重要な課題の一つは、ユーザー接続を効率的に管理することです。多数のユーザーが接続しているかもしれませんが、これらの接続はほとんどの時間アイドル状態です。

残念なことに、WebSocketサポートを提供する多くのサービス(API Gateway、AppSync、IoT Coreなど)は、接続時間に対して課金されます。つまり、ユーザーが積極的にメッセージを送受信していなくても、接続を維持するために料金を支払わなければならないのです。

アプリケーションの規模が大きくなるにつれて、これは非常に非効率になる可能性があります。そこでMomento Topicsの出番です!

MomentoのTopicsは、WebSocketの難しい部分を抽象化し、使用した分だけ課金されるオンデマンド価格になっています。

このガイドでは、Momento Topicsを使用してリアルタイム・チャット・アプリケーションを構築する方法を説明します。

このデモのコードはすべて、このレポジトリ にあります。

前提条件

このガイドの前提条件は以下の通りです:

・Momentoアカウント: gomento.com で無料アカウントにサインアップできます。
・AWSアカウント。
・Node.js
・CDK v2

Momento Topicsについて

Momento Topicsで重要なことは、仮想であるということです。つまり、使用する前に明示的にトピックを作成する必要はありません。

このことを覚えておいてください。

使い捨てトークンを理解する

Momentoには、認証と認可のためにAPIキーと使い捨てトークンという2つの独立したメカニズムがあります。

バックエンドのプロセスには API キーを使用します。

フロントエンドでは、Momento SDK を使用して使い捨てトークンを生成できます。これらのトークンは、特定のリソースに対する特定のアクションにスコープすることができます。

例えば、トークンは特定のトピックの公開と購読の許可を与えることができます。すべてのチャットルームにトピックがある場合、使い捨てトークンを使用して、ユーザーがアクセスできるルームを制御することができます。

アーキテクチャの概要

このデモでは、フルスタックのアプリケーションを構築します:

・ログインすると、利用可能なチャットルームのリストが表示されます。
・ユーザは新しいチャットルームを作成したり、利用可能なチャットルームに参加することができます。
・チャットルームのリストはDynamoDBに保存されます。
・すべてのチャットルームは、Momentoに関連するトピックを持っています。
・ユーザがチャットルームに参加すると、フロントエンドはそのチャットルームのトピックを購読します。
・ユーザーがルームにメッセージを送信すると、そのメッセージはトピックに公開され、ルーム内のすべてのユーザーに配信されます。

APIには3つのルートがあります:

POST /chats: 新しいチャットルームを作成します。
GET /chats: 利用可能なチャットルームをリストアップします。
GET /token: 使い捨てトークンを生成し、フロントエンドがMomentoトピックを購読できるようにします。

これが私たちの全体的なアーキテクチャーです:

ステップ1:新しいキャッシュの作成

MomentoトピックはMomentoキャッシュと同じインフラを共有しています。トピックを使用するには、まずキャッシュを作成する必要があります。

Momentoコンソールにログインし、「キャッシュ 」に移動します。

“Create cache “をクリックします。

この新しいキャッシュを 「chat 」と呼び、「us-east-1 」リージョンに作成します。

これは、Momento CLI からも実行できます。

ステップ2:APIキーの生成

Momentoには2種類のAPIキーがあります:

・Super User キー: キャッシュやトピックの管理、使い捨てトークンの生成に使用します。
・Fine Grained キー: Momentキャッシュ/トピックと対話するためのものです。

使い捨てトークンを生成するには、Super User キーが必要です。そのため、今回はスーパーユーザーキーを作成する必要があります。

Momentoコンソールの 「API keys 」に移動し、「us-east-1 」リージョンに新しい 「Super User Key 」を生成します。

Generate Api Key” をクリックします。

ステップ3:APIキーの保管

APIキーを安全に保管するために、SSM Parameter Storeに保存します。

AWSの 「AWS Systems Manager 」コンソールに行き、「Parameter Store 」をクリックします。

新しいパラメーターを作成し、そのパラメーターを「/chat-api/dev/momento-api-key 」とします。これは私が通常SSMパラメータに使っている命名規則です: /{サービス名}/{環境}/{パラメータ名}。

パラメータのタイプが 「SecureString 」であることを確認します。これにより、APIキーが暗号化されます。

ステップ4:CDKアプリの作成

このデモではCDKを使用します。しかし、他のInfrastructure-as-Codeツールでも同じように動作します。

CDKアプリのために、以下のことを行います:

1..サーバーレス開発にとって最もインパクトのあるプラクティスのひとつであるエフェメラル環境 をサポートする。
2.エフェメラル環境で、メイン環境(devなど)のSSMパラメータを再利用できるようにする。

そこで、stageNamessmStageNameという2つのコンテキスト変数を取り込みます。

stageNameは、名前の衝突を避けるために、作成するすべてのAWSリソースの名前に含まれます。

ssmStageNameは、参照するすべてのSSMパラメータでstageNameの代わりに使用されます。

これらを念頭に置いて、CDKのアプリを紹介します。

#!/usr/bin/env node

const cdk = require('aws-cdk-lib');
const { ChatApiStack } = require('./constructs/chat-api-stack');

const app = new cdk.App();

let stageName = app.node.tryGetContext('stageName');
let ssmStageName = app.node.tryGetContext('ssmStageName');

if (!stageName) {
  console.log('Defaulting stage name to dev');
  stageName = 'dev';
}

if (!ssmStageName) {
  console.log(`Defaulting SSM stage name to "stageName": ${stageName}`);
  ssmStageName = stageName;
}

const serviceName = 'chat-api';

new ChatApiStack(app, `ChatApiStack-${stageName}`, {
  serviceName,
  stageName,
  ssmStageName,
});

そして、ChatApiStackです:

const { Stack, CfnOutput } = require('aws-cdk-lib');
const { Runtime } = require('aws-cdk-lib/aws-lambda');
const { NodejsFunction } = require('aws-cdk-lib/aws-lambda-nodejs');
const { Table, BillingMode, AttributeType } = require('aws-cdk-lib/aws-dynamodb');
const { RestApi, LambdaIntegration, CfnAuthorizer, AuthorizationType } = require('aws-cdk-lib/aws-apigateway');
const iam = require('aws-cdk-lib/aws-iam');
const { UserPool, UserPoolClient } = require('aws-cdk-lib/aws-cognito');

const MOMENTO_CACHE_NAME = 'chat';

class ChatApiStack extends Stack {
  constructor(scope, id, props) {
    super(scope, id, props);

    const api = new RestApi(this, `${props.stageName}-ChatApi`, {
      deployOptions: {
        stageName: props.stageName,
        tracingEnabled: true
      }
    });

    const userPool = new UserPool(this, 'CognitoUserPool', {
      userPoolName: `${props.serviceName}-${props.stageName}-UserPool`,
      selfSignUpEnabled: true,
      signInAliases: { email: true }
    });

    const webUserPoolClient = new UserPoolClient(this, 'WebUserPoolClient', {
      userPool,
      authFlows: {
        userSrp: true
      },
      preventUserExistenceErrors: true
    });

    new CfnOutput(this, 'UserPoolId', { value: userPool.userPoolId });
    new CfnOutput(this, 'UserPoolClientId', { value: webUserPoolClient.userPoolClientId });

    this.momentoApiKeyParamName = `/${props.serviceName}/${props.ssmStageName}/momento-api-key`;
    this.momentoApiKeyParamArn = `arn:aws:ssm:${this.region}:${this.account}:parameter${this.momentoApiKeyParamName}`;

    this.chatsTable = this.createChatsTable();

    const listChatsFunction = this.createListChatsFunction(props);
    const newChatFunction = this.createNewChatFunction(props);
    const tokenVendingMachineFunction = this.createTokenVendingMachineFunction(props);

    this.createApiEndpoints(api, userPool, {
      listChats: listChatsFunction,
      newChat: newChatFunction,
      tokenVendingMachine: tokenVendingMachineFunction
    });
  }

  createChatsTable() {
    return new Table(this, 'ChatsTable', {
      billingMode: BillingMode.PAY_PER_REQUEST,
      partitionKey: { 
        name: 'chatName', 
        type: AttributeType.STRING
      }
    });
  }

  createListChatsFunction(props) {
    const func = new NodejsFunction(this, 'ListChatsFunction', {
      runtime: Runtime.NODEJS_20_X,
      handler: 'handler',
      entry: 'functions/list-chats.js',
      memorySize: 1024,
      environment: {
        SERVICE_NAME: props.serviceName,
        STAGE_NAME: props.stageName,
        POWERTOOLS_LOG_LEVEL: props.stageName === 'prod' ? 'INFO' : 'DEBUG',
        CHATS_TABLE_NAME: this.chatsTable.tableName
      }
    });

    this.chatsTable.grantReadData(func);

    return func;
  }

  createNewChatFunction(props) {
    const func = new NodejsFunction(this, 'NewChatFunction', {
      runtime: Runtime.NODEJS_20_X,
      handler: 'handler',
      entry: 'functions/new-chat.js',
      memorySize: 1024,
      environment: {
        SERVICE_NAME: props.serviceName,
        STAGE_NAME: props.stageName,
        POWERTOOLS_LOG_LEVEL: props.stageName === 'prod' ? 'INFO' : 'DEBUG',
        CHATS_TABLE_NAME: this.chatsTable.tableName
      }
    });

    this.chatsTable.grantWriteData(func);

    return func;
  }

  createTokenVendingMachineFunction(props) {
    const func = new NodejsFunction(this, 'TokenVendingMachineFunction', {
      runtime: Runtime.NODEJS_20_X,
      handler: 'handler',
      entry: 'functions/token-vending-machine.js',
      memorySize: 1024,      
      environment: {
        SERVICE_NAME: props.serviceName,
        STAGE_NAME: props.stageName,
        MOMENTO_API_KEY_PARAM_NAME: this.momentoApiKeyParamName,
        MOMENTO_CACHE_NAME,
        POWERTOOLS_LOG_LEVEL: props.stageName === 'prod' ? 'INFO' : 'DEBUG'
      }
    });

    func.role.attachInlinePolicy(new iam.Policy(this, 'TokenVendingMachineFunctionSsmPolicy', {
      statements: [
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: [ 'ssm:GetParameter*' ],
          resources: [ this.momentoApiKeyParamArn ]
        })
      ]
    }));

    return func;
  }

  /**
   * 
   * @param {RestApi} api
   * @param {UserPool} userPool
   */
  createApiEndpoints(api, userPool, functions) {
    const authorizer = new CfnAuthorizer(this, 'CognitoAuthorizer', {
      name: 'CognitoAuthorizer',
      type: 'COGNITO_USER_POOLS',
      identitySource: 'method.request.header.Authorization',
      providerArns: [userPool.userPoolArn],
      restApiId: api.restApiId,
    });    

    const chatsResource = api.root.addResource('chats');

    // POST /chats
    chatsResource.addMethod('POST', new LambdaIntegration(functions.newChat), {
      authorizer: {
        authorizationType: AuthorizationType.COGNITO,
        authorizerId: authorizer.ref
      }
    });

    // GET /chats
    chatsResource.addMethod('GET', new LambdaIntegration(functions.listChats), {
      authorizer: {
        authorizationType: AuthorizationType.COGNITO,
        authorizerId: authorizer.ref
      }
    });

    chatsResource.addCorsPreflight({
      allowHeaders: ['*'],
      allowMethods: ['OPTIONS', 'GET', 'POST'],
      allowCredentials: true,
      allowOrigins: ['*']
    });

    // GET /token
    const tokenResource = api.root.addResource('token');
    tokenResource.addMethod('GET', new LambdaIntegration(functions.tokenVendingMachine), {
      authorizer: {
        authorizationType: AuthorizationType.COGNITO,
        authorizerId: authorizer.ref
      }
    });

    tokenResource.addCorsPreflight({
      allowHeaders: ['*'],
      allowMethods: ['OPTIONS', 'POST'],
      allowCredentials: true,
      allowOrigins: ['*']
    });
  }
}

module.exports = { ChatApiStack }

ここでは、API GatewayにAPIを作成し、前述のルートを実装するために3つのLambda関数を作成します:

TokenVendingMachine関数
NewChat関数
ListChats関数

認証と認可

APIはCognitoオーソライザーによって保護されています。

const authorizer = new CfnAuthorizer(this, 'CognitoAuthorizer', {
  name: 'CognitoAuthorizer',
  type: 'COGNITO_USER_POOLS',
  identitySource: 'method.request.header.Authorization',
  providerArns: [userPool.userPoolArn],
  restApiId: api.restApiId,
});

SSMパラメータを安全にロードする

この関数では、Momento API キーを環境変数として含めていないことに注意してください。代わりに、パラメータ名を渡しています:

environment: {
  SERVICE_NAME: props.serviceName,
  STAGE_NAME: props.stageName,
  MOMENTO_API_KEY_PARAM_NAME: this.momentoApiKeyParamName,
  MOMENTO_CACHE_NAME,
  POWERTOOLS_LOG_LEVEL: props.stageName === 'prod' ? 'INFO' : 'DEBUG'
}

そして、実行時にパラメータを取得するIAMパーミッションを関数に与えます。

func.role.attachInlinePolicy(new iam.Policy(this, 'TokenVendingMachineFunctionSsmPolicy', {
  statements: [
    new iam.PolicyStatement({
      effect: iam.Effect.ALLOW,
      actions: [ 'ssm:GetParameter*' ],
      resources: [ this.momentoApiKeyParamArn ]
    })
  ]
}));

これは以下のために行っています:

・環境変数から情報を盗むような、危険な依存関係から身を守る。
・APIキーの有効期限を短くし、アプリケーションを再デプロイすることなくローテーションできる。

コールドスタート中、関数はSSMパラメータをフェッチし、復号化し、その値を数分間キャッシュします。キャッシュの有効期限が切れると、次の呼び出しはSSMパラメータストアから更新された値をフェッチしようとします。

こうすることで、呼び出すたびにSSMを呼び出す必要がなくなります。バックグラウンドで(cronジョブを使って)APIキーをローテーションすると、キャッシュの有効期限が切れた後、関数が自動的に新しいキーをピックアップします。

幸運なことに、ssmミドルウェアはこのフローをすぐにサポートしています。このミドルウェアに力仕事を任せることにするが、これについては後で詳しく説明します!

ワークフローの例

JIRAチケット 「ABP-1734 」に取り掛かる場合:

1.フューチャーブランチABP-1734を作成する。
2.cdk deploy –context stageName=FEAT-ABP-1734 –context ssmStageName=dev を実行してエフェメラル環境を作成する。これでチャットAPIの新しいインスタンスが作成され、自分の変更を分離して作業できるようになります。この新しい環境はdev SSMパラメータを使いますが、すべてのリソースにFEAT-ABP-1734サフィックスが付きます。
3. 変更を加えてテストし、PRを作成する。
4. cdk destroy –context stageName=FEAT-ABP-1734 –context ssmStageName=dev
を実行しエフェメラル環境を削除する。

これらの短期間の環境は、機能開発やCI/CDパイプラインでのテスト実行に便利です。従量課金制のおかげで、追加コストを発生させることなく、必要なだけの環境を持つことができます。

理想的には、1つの環境につき1つのMomentoキャッシュを持つことです。その場合、キャッシュの名前はstageNameを接頭辞または接尾辞に付けます。

ステップ5:TokenVendingMachine関数の実装

以下は、GET /tokenルートの背後にあるTokenVendingMachine関数のコードです:

const { initAuthClient, generateToken } = require('../lib/momento');
const middy = require('@middy/core');
const cors = require('@middy/http-cors');
const ssm = require('@middy/ssm');

module.exports.handler = middy(async (event, context) => {  
  await initAuthClient(context.MOMENTO_API_KEY);

  const tokenResult = await generateToken();

  return {
    statusCode: 200,
    body: JSON.stringify(tokenResult)
  }
})
.use(cors())
.use(ssm({
  cache: true,
  cacheExpiry: 5 * 60 * 1000,
  setToContext: true,
  fetchData: {
    MOMENTO_API_KEY: process.env.MOMENTO_API_KEY_PARAM_NAME
  }
}));

ここでは、ssmミドルウェアを使用して、SSMパラメータストアからMomento APIキーを取得し、キャッシュしています。

.use(ssm({
  cache: true,
  cacheExpiry: 5 * 60 * 1000,
  setToContext: true,
  fetchData: {
    MOMENTO_API_KEY: process.env.MOMENTO_API_KEY_PARAM_NAME
  }
}));

デフォルトでは、ミドルウェアは取得したデータを環境変数に注入します。しかし前述の通り、暗号化されていないAPIキーをラムダ関数の環境変数に入れるのは避けるべきです。攻撃者は環境変数をスキャンして機密情報を探すことが多いからです。

そこで、ミドルウェアに、取得したデータを代わりにラムダの呼び出しcontext オブジェクトに設定するよう依頼します。そのため、Momentoクライアントを初期化する際に、context.MOMENTO_API_KEYからMomento APIキーを取得する必要がります。

await initAuthClient(context.MOMENTO_API_KEY);

共有ロジックのカプセル化

先ほどのスニペットのように、Momentoに関連するすべての操作を共有のmomento.jsモジュールにカプセル化しました。

これには、Momento認証クライアントの初期化などの共有ロジックも含まれます。

const {
  CredentialProvider,
  AuthClient,
  DisposableTokenScopes,
  ExpiresIn,
  GenerateDisposableTokenResponse,
  AllTopics,
} = require('@gomomento/sdk');

const { Logger } = require('@aws-lambda-powertools/logger');
const logger = new Logger({ serviceName: 'leaderboard-api' });

const { MOMENTO_CACHE_NAME } = global.process.env;

let authClient;

async function initAuthClient(apiKey) {
  if (!authClient) {
    logger.info('Initializing Momento auth client');
    
    authClient = new AuthClient({
      credentialProvider: CredentialProvider.fromString(apiKey)
    });

    logger.info('Initialized Momento auth client');
  }
};

async function generateToken() {
  const result = await authClient.generateDisposableToken(
    DisposableTokenScopes.topicPublishSubscribe(MOMENTO_CACHE_NAME, AllTopics),
    ExpiresIn.minutes(30)
  );

  return {
    endpoint: result.endpoint,
    token: result.authToken,
    cacheName: MOMENTO_CACHE_NAME,
    expiresAt: result.expiresAt
  };
}

module.exports = { 
  initAuthClient,
  generateToken,
};

ここでは、Lambdaの実行環境が再利用されることを利用しています。

(コールドスタート時に)新しい実行環境が作成されると、authClient変数が設定されます。次に同じ実行環境を呼び出すと、initAuthClient関数は短絡してすぐにリターンします。

きめ細かな認証

この一行に注目してください:

const result = await authClient.generateDisposableToken(
  DisposableTokenScopes.topicPublishSubscribe(MOMENTO_CACHE_NAME, AllTopics),
  ExpiresIn.minutes(30)
);

ここでは、「chat 」キャッシュ内のすべてのトピックをパブリッシュ、サブスクライブできる短命トークンを生成しています。これはかなり寛容で、ユーザーはどのチャットルームにも参加し、メッセージを送ることができます。

私たちのデモでは、すべてのチャットルームが公開されているので、これは問題ありません。しかし、現実の世界では、プライベートなチャットルームもあるでしょう。その場合、DisposableTokenScopesを、ユーザーの権限に基づいたトピックのサブセットにしなければなりません。

ステップ6: ListChats関数の実装

以下は、GET /chatsルートの背後にあるListChats関数のコードです:

const { DynamoDB } = require("@aws-sdk/client-dynamodb");
const { DynamoDBDocumentClient, ScanCommand } = require("@aws-sdk/lib-dynamodb");
const dynamodbClient = new DynamoDB();
const dynamodb = DynamoDBDocumentClient.from(dynamodbClient);
const middy = require('@middy/core');
const cors = require('@middy/http-cors');

module.exports.handler = middy(async () => {
  const { Items } = await dynamodb.send(new ScanCommand({
    TableName: process.env.CHATS_TABLE_NAME    
  }))

  const chats = Items.map(x => ({ chatName: x.chatName }))
  return {
    statusCode: 200,
    body: JSON.stringify(chats)
  }
})
.use(cors());

この関数は、ChatsTable DynamoDBテーブルに対してスキャンを実行し、利用可能なすべてのテーブルを取得します。

ステップ 7: NewChat関数の実装

以下は、POST /chats ルートの背後にある NewChat 関数のコードです:

const { DynamoDB } = require("@aws-sdk/client-dynamodb");
const { DynamoDBDocumentClient, PutCommand } = require("@aws-sdk/lib-dynamodb");
const dynamodbClient = new DynamoDB();
const dynamodb = DynamoDBDocumentClient.from(dynamodbClient);
const middy = require('@middy/core');
const cors = require('@middy/http-cors');

module.exports.handler = middy(async (event) => {
  const { chatName } = JSON.parse(event.body);
  const chat = { chatName };

  await dynamodb.send(new PutCommand({
    TableName: process.env.CHATS_TABLE_NAME,
    Item: chat,
    ConditionExpression: "attribute_not_exists(chatName)"
  }));

  return {
    statusCode: 201,
    body: JSON.stringify(chat)
  };
})
.use(cors());

簡潔にするために、ChatsTable DynamoDBテーブルはシンプルにしました。

念のため、これはテーブルを作成したCDKのコードです:

createChatsTable() {
  return new Table(this, 'ChatsTable', {
    billingMode: BillingMode.PAY_PER_REQUEST,
    partitionKey: { 
      name: 'chatName', 
      type: AttributeType.STRING
    }
  });
}

テーブルにはchatNameというハッシュキーがあります。これは、重複したチャットルーム名を持つことは不可能であることを意味します。

チャットルームに関する追加のメタデータを含めることは一般的です – 例えば、それらが作成されたとき、それらを作成した人などです。

このデモでは、これらのメタデータは省略しています。しかし、現実味を増すために、同じ名前のチャットルームが既に存在する場合、PutCommandが既存のチャットルームを上書きしないようにします。そのため、ConditionExpression を入れて、そのようなことが起こらないようにしています。

await dynamodb.send(new PutCommand({
  TableName: process.env.CHATS_TABLE_NAME,
  Item: chat,
  ConditionExpression: "attribute_not_exists(chatName)"
}));

ステップ 8: フロントエンドでトピックを購読する

フロントエンドのコードについては、リアルタイム・メッセージングには関係ないものがほとんどなので、詳しく説明はしません。すべてのコードはdemo repoのfrontendディレクトリにあります。

以下が重要な部分です。

ユーザーがサインインした後:

  1. GET /tokenエンドポイントをユーザーのIDトークンで呼び出し、使い捨てトークンを取得する。
  2. GET /chatsエンドポイントを呼び出し、利用可能なチャットルームをリストアップする。

3. ユーザーがチャットルームの一つをクリックしたら、下記のjoinChat機能を使って、そのルームのトピックに登録する。

import { subscribeToTopic } from '@/lib/momento'

let subscription = null

const chatMessages = ref([])
const isConnected = ref(false)

...

const joinChat = async (chatName) => {
  if (subscription) {
    console.log('Unsubscribing from existing chat...')
    await subscription.unsubscribe()

    isConnected.value = false    
    chatMessages.value = []
  }
  
  console.log('Subscribing to chat:', chatName)
  subscription = await subscribeToTopic(
    momentoToken, 
    momentoCacheName, 
    chatName, 
    (jsonMsg) => {
      const { sender, message } = JSON.parse(jsonMsg)
      chatMessages.value.push({ sender, message })
    })
    
  isConnected.value = true
}

ここで注意すべきことがいくつかあります。

まず、一度に1つの部屋だけに接続されるようにするため、常に最初に現在のトピックから退会します。

if (subscription) {
  console.log('Unsubscribing from existing chat...')
  await subscription.unsubscribe()

  isConnected.value = false 
  chatMessages.value = []
}

次に、Momentoのトピックを購読するときに、トピックから新しいメッセージを受信したときに処理するデリゲート関数を渡します。

subscription = await subscribeToTopic(
  momentoToken, 
  momentoCacheName, 
  chatName,
  (jsonMsg) => {
    const { sender, message } = JSON.parse(jsonMsg)
    chatMessages.value.push({ sender, message })
  })

我々はここでいくつかの仮定をしています:

メッセージはJSON文字列である。
JSONオブジェクトには、送信者(電子メール)とメッセージが含まれる。
つまり、トピックにメッセージを送信する際には、メッセージに両方を含める必要がある。

しかし、この購読オブジェクトとは何でしょうか?それを理解するために、subscribeToTopicヘルパー関数を見てみましょう。

import { 
  TopicClient, 
  TopicConfigurations, 
  CredentialProvider
} from '@gomomento/sdk-web'

async function subscribeToTopic(authToken, cacheName, topicName, onMessage) {
  console.log('Initializing Momento topic client', authToken)

  const topicClient = new TopicClient({
    configuration: TopicConfigurations.Browser.latest(),
    credentialProvider: CredentialProvider.fromString({
      authToken
    })
  })

  console.log('Initialized Momento topic client')
  console.log('Subscribing to Momento topic:', { cacheName, topicName })

  const resp = await topicClient.subscribe(cacheName, topicName, {
    onItem: (item => onMessage(item.value()))
  })

  if (!resp.isSubscribed) {
    const error = resp.innerException()
    console.error(`Failed to subscribe to Momento topic [${topicName}]: `, error)
    throw error
  }

  return {
    send: async (message) => {
      await topicClient.publish(cacheName, topicName, message)
    },
    unsubscribe: async () => await resp.unsubscribe()
  }
}

最も重要な行はこれです:

const resp = await topicClient.subscribe(cacheName, topicName, {
  onItem: (item => onMessage(item.value()))
})

こうしてトピックを購読します。メッセージを受信すると、前述のデリゲート関数で処理されます。

残念ながら、トピックにメッセージを送信するにはtopicClientが必要です。また、トピックの購読を解除するには購読のレスポンスが必要です。

抽象化のリークを避けるために、このヘルパー関数は、UIコンポーネントがトピックにメッセージを送信したり、トピックからの購読を解除したりできるようにするオブジェクトを返します。

return {
  send: async (message) => {
    await topicClient.publish(cacheName, topicName, message)
  },
  unsubscribe: async () => await resp.unsubscribe()
}

これは以前に見たsubscriptionオブジェクトです。

ステップ9:トピックにメッセージを送る

トピックに接続したら、前述のsubscriptionオブジェクトを使用してトピックにメッセージを送信することができます。

const sendMessage = async () => {
  await subscription.send(JSON.stringify({
    sender: currentUserEmail.value,
    message: newMessage.value
  }))

  newMessage.value = ''
}

先ほど説明したように、送信者のEメールとメッセージをJSONとして含める必要があります。なぜなら、subscribe関数で受け取ることを期待しているからです。

これで完成です!これで、ユーザー同士がリアルタイムでコミュニケーションできるチャットアプリケーションが出来上がりました。会話はチャットルームに整理され、ユーザーはチャットルームに参加したり、チャットルームを作成したりすることができます。

異なるブラウザでアプリケーションの複数のインスタンスを実行したり、シークレットウィンドウを利用することができます。そうすることで、チャットルームにメッセージを送り、相手のウィンドウに即座にメッセージが表示されるのを見ることができます。

ステップ10: デプロイとテスト

最後に、cdk deployを実行してテストしてください!

実際にどのように動作するのか見たい場合は、こちらのフルコースをチェックしてください。