API Gateway → Lambda → SQS → Lambda でデータを流してみた

プログラミング

概要

CDKを用いてタイトルのような順番でデータを流す構成を試してみました。
簡単な構成図は以下のとおりです。


ChatGPTに丸投げしたら結構いい感じのが一発でできたのでそれをベースに改造していきました。

この記事を作成したときのコードが以下リポジトリにあります。

Release v1.0.0 · nakajima97/lambda-sqs-api-gateway-examples
概要以下の構成を試したAPI Gateway↓Lambda↓SQS↓Lambda

やってみた

ChatGPTに投げてみた

ChatGPTって便利ですよね
適当に投げかければいい感じの土台が一瞬でできるので。
何を投げれば良いのかはあまり考えず適当に以下を投げてみた。

AWS CDKで以下の構成を組むときのコードを教えて

- API Gateway → Lambda  → SQS → Lambda
- API Gatewayと結びつけるLambdaはリクエストデータをSQSに push するだけ
- SQSからデータを取得するLambdaはSQSから取り出したデータをログに出すだけ

投げた結果が以下。

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { HttpApi, HttpMethod } from 'aws-cdk-lib/aws-apigatewayv2';
import { Function, Runtime, Code } from 'aws-cdk-lib/aws-lambda';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';

export class ApiGatewayLambdaSqsStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Create the SQS queue
    const queue = new Queue(this, 'Queue');

    // Create the Lambda function for pushing data to SQS
    const pushDataToQueueFunction = new Function(this, 'PushDataToQueueFunction', {
      runtime: Runtime.NODEJS_14_X,
      handler: 'index.handler',
      code: Code.fromAsset('lambda'),
      environment: {
        QUEUE_URL: queue.queueUrl
      }
    });

    // Create the Lambda function for processing data from SQS
    const processDataFromQueueFunction = new Function(this, 'ProcessDataFromQueueFunction', {
      runtime: Runtime.NODEJS_14_X,
      handler: 'index.handler',
      code: Code.fromAsset('lambda')
    });
    processDataFromQueueFunction.addEventSource(new SqsEventSource(queue));

    // Create the API Gateway
    const api = new HttpApi(this, 'ApiGateway');

    // Configure the integration between API Gateway and the pushDataToQueueFunction Lambda function
    const pushDataIntegration = new LambdaProxyIntegration({
      handler: pushDataToQueueFunction
    });
    api.addRoutes({
      path: '/push',
      methods: [HttpMethod.POST],
      integration: pushDataIntegration
    });

    // Configure the integration between API Gateway and the processDataFromQueueFunction Lambda function
    const processDataIntegration = new LambdaProxyIntegration({
      handler: processDataFromQueueFunction
    });
    api.addRoutes({
      path: '/process',
      methods: [HttpMethod.GET],
      integration: processDataIntegration
    });
  }
}

これ瞬時に出してくれるのは本当にすごいですよね。
自分だったらこれ組むのにCDK慣れていないこともあり1時間とかかかるんじゃないんでしょうか。

投げてから気が付いたのですがlambdaで使う言語とか一切指定していませんでした。
ただ、使いたかったnodeで出してくれたのでこれをそのまま採用することに。

肝心のlambdaのコードがなかったのでこれもお願いすることに

上記で使用している具体的なLambdaのコードを教えて

それで出してくれたのが以下

exports.handler = async (event) => {
  // リクエストデータを取得するなどの処理を行います
  const requestData = event.body;

  // SQSキューにデータをプッシュします
  // QUEUE_URLはLambda関数の環境変数から取得します
  const params = {
    MessageBody: requestData,
    QueueUrl: process.env.QUEUE_URL
  };
  
  // SQSへのデータプッシュ処理
  // 例: AWS SDKを使用してSQSにデータをプッシュする処理を行う
  const AWS = require('aws-sdk');
  const sqs = new AWS.SQS();
  await sqs.sendMessage(params).promise();
  
  // レスポンスを返します
  return {
    statusCode: 200,
    body: 'Data pushed to SQS'
  };
};
exports.handler = async (event) => {
  // SQSキューからデータを取得します
  // 例: AWS SDKを使用してSQSからデータを取得する処理を行う
  const AWS = require('aws-sdk');
  const sqs = new AWS.SQS();
  const params = {
    QueueUrl: process.env.QUEUE_URL,
    MaxNumberOfMessages: 1,
    VisibilityTimeout: 10,
    WaitTimeSeconds: 0
  };
  const data = await sqs.receiveMessage(params).promise();
  
  // 取得したデータをログに出力します
  console.log(data);
  
  // レスポンスを返します
  return {
    statusCode: 200,
    body: 'Data processed from SQS'
  };
};

本当に最高

ただこのままではぱっと見でも動ない気がした
以下直した方が良いと思ったので修正

  1. 2つのlambda関数が両方ともAPI Gatewayと紐づいている
  2. aws-cdk-lib/aws-apigatewayv2 ではなく aws-cdk-lib/aws-apigateway を使いたい
    • TypeScript WorkShopがこっちだったのでとりあえずこっち使って動かしたい
      https://cdkworkshop.com/ja/20-typescript/30-hello-cdk/400-apigw.html
  3. processDataFromQueueFunctionにSQSのURLが設定されていない

で修正した結果が以下

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { Function, Runtime, Code } from 'aws-cdk-lib/aws-lambda';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { LambdaRestApi } from 'aws-cdk-lib/aws-apigateway';
import { PolicyStatement } from 'aws-cdk-lib/aws-iam';

export class LambdaSqsApiGatewayExamplesStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Create the SQS queue
    const queue = new Queue(this, 'Queue');

    // Create the Lambda function for pushing data to SQS
    const pushDataToQueueFunction = new Function(this, 'PushDataToQueueFunction', {
      runtime: Runtime.NODEJS_14_X,
      handler: 'index.handler',
      code: Code.fromAsset('lambda/pushDataToQueueFunction'),
      environment: {
        QUEUE_URL: queue.queueUrl
      },
      timeout: cdk.Duration.seconds(10),
    });

    // Create the Lambda function for processing data from SQS
    const processDataFromQueueFunction = new Function(this, 'ProcessDataFromQueueFunction', {
      runtime: Runtime.NODEJS_14_X,
      handler: 'index.handler',
      code: Code.fromAsset('lambda/processDataFromQueueFunction'),
      timeout: cdk.Duration.seconds(10),
      environment: {
        QUEUE_URL: queue.queueUrl
      },
    });
    processDataFromQueueFunction.addEventSource(new SqsEventSource(queue));

    // Create the API Gateway
    new LambdaRestApi(this, 'Endpoint', {
      handler: pushDataToQueueFunction
    });

    pushDataToQueueFunction.addToRolePolicy(new PolicyStatement({ actions: ['sqs:SendMessage'], resources: [queue.queueArn] }))
  }
}

これでデプロイして動作を確認したがSQSからデータを取る処理が思ったような値が取れない
IDしか取れず、データの中身が確認できなかった。


ネットで調べて試行錯誤して直した結果が以下。

exports.handler = async (event) => {
  // SQSキューからデータを取得します
  // 例: AWS SDKを使用してSQSからデータを取得する処理を行う
  const AWS = require("aws-sdk");
  const sqs = new AWS.SQS();
  const params = {
    QueueUrl: process.env.QUEUE_URL,
    MaxNumberOfMessages: 1,
    VisibilityTimeout: 10,
    WaitTimeSeconds: 0,
  };
  const data = await sqs.receiveMessage(params).promise();

  // 取得したデータをログに出力します
  event.Records.forEach((record) => {
    const { body } = record;
    console.log({ body });
  });

  // レスポンスを返します
  return {
    statusCode: 200,
    body: "Data processed from SQS",
  };
};

CloudWatchLogsでbodyを確認できた

感想

ChatGPT本当にすごいですね。
知識薄くても簡単に動くものができました。

CDKは使い始めたばかりで全然慣れていないので勉強して慣れていきたいです。

Follow me!

コメント

PAGE TOP
タイトルとURLをコピーしました