Amazon EventBridge経由でAmazon DynamoDBのストリームをMomentoと連携 ~ AWS CDKで完全自動化!

AWS CDKを使用したAmazon EventBridgeとMomentoの統合の完全なウォークスルー。

Rishti Gupta

プロジェクトの GitHubリポジトリで、ステップ・バイ・ステップのセットアップ手順をご覧ください。ウェブ・アプリケーションの動作については、このブログ記事 をご覧ください。

このブログでは、AWS CDK (Cloud Development Kit) を使って Amazon EventBridge と Momento の統合を説明します。気象データをDynamoDBに保存し、AWSサービスとMomentoを組み合わせて変更を伝播するサンプルアプリケーションを使用します。

CDKスタックを使ったデプロイの自動化に焦点を当てます。具体的には、DynamoDBテーブルの変更をリッスンし、Amazon EventBridgeを使用してこれらのイベントをMomento キャッシュトピックスにルーティングする方法を示します。

CDKスタック

提供されているCDKスタックは、DynamoDBAmazon EventBridge, Momentoを統合している。順を追って説明します。

1.DynamoDBテーブルの定義

まず、気象統計を格納するDynamoDBテーブルを作成する。このテーブルには、変更をキャプチャするためのストリームが設定されています:

const weatherStatsDemoTable = new dynamodb.Table(this, "weather-stats-demo-table", {
  tableName: "weather-stats-demo",
  partitionKey: { name: "Location", type: dynamodb.AttributeType.STRING },
  stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
  removalPolicy: cdk.RemovalPolicy.DESTROY,
});

2.EventBridgeの接続を設定する

次に、EventBridgeがMomentoにリクエストを送信するための接続を作成します。この接続では、AWS Secrets Manager に保存されている API キーを使用して認証を行います:

const connection = new events.Connection(this, 'weather-stats-demo-connection', {  connectionName: 'weather-stats-demo-connection',  authorization: events.Authorization.apiKey('Authorization', cdk.SecretValue.secretsManager(apiKeySecret.secretName)),  description: 'Connection with API Key Authorization',});

この接続は、EventBridgeがMomentoへのリクエストを安全に認証するために重要です。

注:Momentoは現在、Amazon EventBridgeのパートナーAPIデスティネーションとしてフィーチャーされています!🎉 もしCDKのコードを使ってデプロイするのではなく、AWSコンソールから手動でセットアップしたい場合は、このガイドに従って始めることができます。

3. MomentoのAPI Destinationsを設定する

EventBridgeのAPI Destinationsは、イベントを外部サービスにルーティングするためのものです。ここでは、Momentoの操作別にAPI Destinationsを設定する:

  • キャッシュプット操作: Momentoのキャッシュにデータをセットする。
  • トピック公開操作: イベントをMomentoトピックに公開します。
  • キャッシュ削除操作: Momentoのキャッシュからデータを削除します。
// Define the API destination for the cache put operation.
    const cachePutApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-put-api-destination", {
      apiDestinationName: "weather-stats-demo-cache-put-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
      description: "Cache Set API",
      httpMethod: events.HttpMethod.PUT,
    });

    // Define the API destination for the topic publish operation
    const topicPublishApiDestination = new events.ApiDestination(this, "weather-stats-demo-topic-publish-api-destination", {
      apiDestinationName: "weather-stats-demo-topic-publish-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/topics/*/*`,
      description: "Topic Publish API",
      httpMethod: events.HttpMethod.POST,
    });

    // Define the API destination for the cache delete operation
    const cacheDeleteApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-delete-api-destination", {
      apiDestinationName: "weather-stats-demo-cache-delete-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
      description: "Cache Delete API",
      httpMethod: events.HttpMethod.DELETE,
    });

4.デッドレターキュー(DLQ)の実装

デッドレターキュー(DLQ)は、処理に失敗したイベントが失われないようにするものである。処理に失敗したイベントは破棄されるのではなく、DLQに送られ、そこで後で再試行することができます。

我々のスタックでは、各EventBridge Pipeに対してDLQを構成し、ターゲットAPIデスティネーションで正常に処理できなかったイベントを処理する:

const deadLetterQueue = new sqs.Queue(this, "DeadLetterQueue", {
      queueName: "weather-stats-demo-dlq",
      retentionPeriod: cdk.Duration.days(14),
    });

5. EventBridgeパイプの作成

EventBridgeパイプは、イベントソースとターゲットを接続します。ここでは、異なるタイプのDynamoDBイベントを処理するために、3つのパイプを定義しています:

  • キャッシュ・プット・パイプ:  INSERT  および  MODIFY イベントをキャッシュ put API にルーティングします。
  • トピック・パブリッシュ・パイプ: イベントをトピック・パブリッシュ API にルーティングします。
  • キャッシュ削除パイプ:  REMOVEイベントをキャッシュ削除 API にルーティングします。
// Define the pipe for the cache put operation
    const cachePutCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-put-pipe", {
      name: "weather-stats-demo-cache-put-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
        filterCriteria: {
          filters: [{
            pattern: '{"eventName": ["INSERT", "MODIFY"]}',
          }],
        },
      },
      target: cachePutApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Define the pipe for the topic publish operation
    const topicPublishCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-topic-publish-pipe", {
      name: "weather-stats-demo-topic-publish-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
      },
      target: topicPublishApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Define the pipe for the cache delete operation
    const cacheDeleteCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-delete-pipe", {
      name: "weather-stats-demo-cache-delete-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
        filterCriteria: {
          filters: [{
            pattern: '{"eventName": ["REMOVE"]}',
          }],
        },
      },
      target: cacheDeleteApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Add target parameters to the pipes
    cachePutCfnPipe.targetParameters = {
      inputTemplate: "{\n  \"Location\": <$.dynamodb.Keys.Location.S>, \n  \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n  \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n  \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
      httpParameters: {
        pathParameterValues: [cacheName],
        queryStringParameters: {
          key: "$.dynamodb.Keys.Location.S",
          ttl_seconds: "$.dynamodb.NewImage.TTL.N"
        },
      },
    };

    topicPublishCfnPipe.targetParameters = {
      inputTemplate: "{\n \"EventType\": <$.eventName>,  \"Location\": <$.dynamodb.Keys.Location.S>, \n  \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n  \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n  \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
      httpParameters: {
        pathParameterValues: [cacheName, topicName],
      },
    };

    cacheDeleteCfnPipe.targetParameters = {
      httpParameters: {
        pathParameterValues: [cacheName],
        queryStringParameters: {
          key: "$.dynamodb.Keys.Location.S"
        },
      },
    };

また、これらのパイプにターゲット・パラメータを追加して、Momento に送信する前にデータをどのようにフォーマットするかを指定する。

6. IAMロールとポリシーの定義

IAMロールは、EventBridge PipesがDynamoDBストリーム、API Destinations、および失敗したイベントを処理するためのDead Letter Queue (DLQ) にアクセスすることを許可するポリシーとともに作成されます:

const role = new iam.Role(this, "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache", {
  roleName: "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache",
  assumedBy: new iam.ServicePrincipal("pipes.amazonaws.com"),
});
this.addPolicyForEventBridgeRole(role, cachePutApiDestination, cacheDeleteApiDestination, topicPublishApiDestination, weatherStatsDemoTable, deadLetterQueue);

7.ユーティリティリソースの作成

APIキーのシークレット、CloudWatchロググループ、Momento APIのパラメータ化された値など、いくつかのユーティリティリソースを作成します:

const apiKeySecret = new Secret(this, 'MomentoEventbridgeApiKey', {
  secretName: 'momento-eventbridge-api-key',
  secretStringValue: new cdk.SecretValue(momentoApiKeyParameter.valueAsString),
});

CDKコードのデプロイ

まず、GitHubのリポジトリをクローンし、EventBridgeのサンプルにcdします:

git clone https://github.com/momentohq/client-sdk-javascript
cd examples/nodejs/aws/eventbridge

デプロイする前に、以下のものが必要です:

  • Momentoキャッシュ: momento-eventbridge-cacheというキャッシュが必要です。Momento コンソール.で作成できます。
  • Momento APIキー: これもMomento コンソール.で作成できます。APIキーがキャッシュと同じリージョンに作成されていることを確認してください! 
  • HTTP API エンドポイント: API Key を作成した後、Momento コンソールからコピーするか、 ドキュメントのリージョンセクションを参照してください。.
  • AWS 認証情報: AWS アカウントの AccessKeyId、SecretAccessKey (および、一時的な認証情報を使用している場合はオプションで SessionToken)。

次に、プロジェクトのルート・ディレクトリに.envファイルを作成し、以下の環境変数を設定する:

MOMENTO_API_KEY=<your-momento-api-key> 
MOMENTO_API_ENDPOINT=<your-momento-api-endpoint>
AWS_ACCESS_KEY_ID=<your-aws-access-key-id>
AWS_SECRET_ACCESS_KEY=<your-aws-secret-access-key>
AWS_REGION=<your-aws-region>
AWS_SESSION_TOKEN=<your-aws-session-token> # Optional, if you are using temporary credentials

infrastructure ディレクトリには、このブログで前述したDynamoDBテーブル、DynamoDB Stream、EventBridgeリソースを定義したCDKアプリケーションが含まれています。これらは、残りのデモコードに不可欠なものなので、プロジェクトの他の部分を実行する前に、CDKスタックをデプロイする必要があります。

CDKスタックをデプロイするには、以下のスクリプトを実行します:

./deploy-stack.sh

これで必要なAWSリソースがすべてプロビジョニングされ、デモコードの実行を開始する準備が整う。 

結論

このセットアップにより、DynamoDB、EventBridge、Momentoを統合したAWS CDKを使用した自動化パイプラインが作成され、天候の更新をリアルタイムで処理できるようになりました。このスタックは、需要に応じて無理なくスケールするキャッシュレイヤーを備えたイベント駆動型アプリケーションを構築する方法の強力な例です。

その他のリソース

サンプル・アプリケーションの詳細については、こちらをご覧ください。 

AWS EventBridgeとMomentoの統合の詳細については、以下のリソースを参照してください:

Rishti Gupta

リシュティはインド出身で、現在はワシントン州シアトルのテックハブで活躍している優秀なソフトウェアエンジニアです。フルスタックエンジニアとしての確固たる経歴を持つリシュティは、Amazonでの経験を生かし、現在はMomentoのエンジニアリングチームの重要なメンバーとして活躍しています。さまざまなソフトウェア開発キット(SDK)の開発に携わり、現在はMomentoのユーザーフレンドリーなウェブコンソールの開発に専門知識を注いでいます。コーディングにとどまらず、リシュティはダンスのリズム、映画の魔法、旅行を通して探求するスリルに喜びを見出しています。彼女のテクノロジーへの情熱は、デジタル領域以外の人生への熱意と一致している。