Django×Amazon SQSで簡単に非同期処理を実現

desk10 CODE

この記事では、Amazon SQSを使ってDjangoアプリで非同期処理を実現する方法を紹介します。

Djangoアプリで非同期処理をする場合、celeryasyncioなどがありますがどれも設定がめんどくさかったり、実装が手間なものが多いです。

DjangoアプリをEC2上で動かしている場合はSQSを使うことで非同期処理を行うための導入コストが格段に下がります。

また、バッチ専用サーバーや別サーバーに処理を移行したい場合はSQSを使うことでサーバー同士をシームレスに扱うことができます。

 

Amazon SQSとは?

SQSとはAWSが提供しているメッセージキューイングサービスです。

キューへメッセージを送信、あるいはキューからメッセージを受信・削除をすることができるシンプルなサービスになります。

上述したように、異なるサーバー同士でデータの送受信が簡単に行えます。

 

そもそもメッセージキューって何がすごいの?

メッセージキュー

メッセージキューとはサーバーレスおよびマイクロサービスアーキテクチャで使われる非同期サービス対サービスの通信形態を指します。

重い処理の分割、バッファーまたはバッチ処理、およびスパイクの多いワークロードを円滑にすることができます。

システム間が同期している必要がないので、好きなタイミングで処理を行うことができます。

ではさっそく導入する手順を紹介していきます。

 

Amazon SQS側の設定

基本的に何も難しい設定はないのでここではざっくり説明します。

キューの作成

今回は少し前に導入され、便利なFIFOキューを使用します。

標準キューでも使い方はあまり変わらないです。

django-sqs1

キュー名は「test.fifo」としました。

※FIFOキューは末尾に.fifoを付ける必要があります

 

django-sqs2

キューを作成できました。

キューのURLはDjangoアプリで使用するのでメモっておきましょう。

 

django-sqs3

 

作成したばかりのキューは、作成者のみしか送受信でない設定です。

他のAWSアカウントを許可したい場合は上記のアクセス許可の追加から行うことができます。

 

これでAWS側の最低限の設定は完了です。

上手く設定できているかEC2のCLIからメッセージの送受信のテストをしてみます。

 

メッセージ送信テスト

コマンドに関する詳細はAWSのドキュメントを参照してください。

aws sqs send-message --queue-url キューのURL --message-body "test message" 
--message-deduplication-id "deduplicationid" --message-group-id "groupid"

 

これはFIFOキューにおける最低限のメッセージ送信コマンドです。

パラメータのmessage-deduplication-id、message-group-idは標準キューの場合必要ありません。

 

キューに送信したメッセージをコンソール上で確認してみましょう。

AWSコンソールで対象キューを選択し、メッセージの表示を押下。

django-sqs5

 

「メッセージのポーリングを開始」を押下。

django-sqs6

 

無事メッセージが送信されました。

ポーリング中に削除しない場合はキューにメッセージが残り続けます。

 

CLIでもキュー内のメッセージを受信してみましょう。

メッセージ受信テスト

aws sqs receive-message --queue-url キューのURL --output json
{
    "Messages": [
        {
            "Body": "test message",
            "ReceiptHandle": ...,
            "MD5OfBody": ...,
            "MessageId": ...
        }
    ]
}

 

無事メッセージが受信できました。

AWS側の設定が確認できたので次はDjangoアプリ側の設定を行います。

 

Django側の設定

SQSとの接続はboto3のクライアントを使用するのでboto3をインストールしておきます。

pip install --upgrade boto3

 

settings.pyを編集

# AWS設定
AWS_REGION = 'your aws region'
AWS_ACCESS_KEY_ID = 'your access key'
AWS_SECRET_ACCESS_KEY = 'your secret key'

# SQS固有の設定をする場合は以下の様に記述
AWS_SQS_ACCESS_KEY=....
AWS_SQS_SECRET_ACCESS_KEY=...
AWS_SQS_REGION=....

以上でキューを送信する準備はできました。

実際にキューにメッセージを送信し非同期処理を行ってみましょう。

 

Djangoアプリで非同期処理をやってみる

今回はこんな感じのアーキテクチャを想定してキューを使用します。

aws-sqs-image

②、③のキューにメッセージを送受信する部分を解説します。

キューにメッセージを送信

import boto3
import json
from django.conf import settings

def send_message(user_id):
    # 引数のユーザーidを辞書に格納し、キューに送信

    body_dict = {"user_id": user_id}
    message_body = json.dumps(body_dict)  # メッセージのbodyはjson形式

    # sqsクライアントを生成
    client = boto3.client(
        'sqs',
        aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
        aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
        region_name=settings.AWS_REGION
    )

    # キューにメッセージを送信
    response = client.send_message(
        QueueUrl=キューのURL,
        MessageBody=message_body,
        MessageGroupId=グループID,
        MessageDeduplicationId=重複ID,  # 重複排除 IDを使用する場合は設定する必要はない
    )
Djangoアプリ内でsend_message(1)のように呼び出せばキューに任意のユーザーidを送信できます。

続いて、キューに送信したユーザーidを取得する処理を用意します。

 

キューからメッセージを受信

メッセージの受信はdjangoのmanagement commandcronで定期実行することで非同期処理におけるワーカーの役割を担います。

 

まず、 キューからジョブを取り出し、ジョブを処理するコマンドを実装します。

app/management/commands内にsqs_reciever.pyを作成します。
import boto3
import json
from django.core.management.base import BaseCommand
from django.conf import settings

class Command(BaseCommand):
    def handle(self, *args, **options):

        # sqsクライアントを生成
        client = boto3.client(
            'sqs',
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name=settings.AWS_REGION
        )

        # キューからメッセージを受信
        response = client.receive_message(
            QueueUrl=キューのURL,
            MaxNumberOfMessages=10,  # 最大受信メッセージ数
            VisibilityTimeout=30,  # 可視性タイムアウト秒数
        )

        # レスポンスの中からメッセージオブジェクトを取得
        messages = response['Messages']

        for message in messages:

        # メッセージはjson形式なのでjsonモジュールで読み込み
        message_body = json.loads(message['Body'])

        # キューからメッセージを取得できたらメッセージを削除
        # 削除しないとキューに残り続けてしまう
        client.delete_message(QueueUrl=キューのURL, ReceiptHandle=message["ReceiptHandle"])
     
        # 辞書形式のメッセージのbodyからユーザーidを取得
        user_id = message_body["user_id"]

    # ユーザーidを使って必要な処理を行う
        ....

このカスタムコマンドをキューのワーカーとして動かすためにcronで定期実行させます。

 

Djangoでカスタムコマンドは、python manage.py sqs_recieverで実行できるので、このコマンドをcronに設定します。
# cron編集コマンド
crontab -e

# 30分おきにワーカーを起動
*/30 * * * * /usr/bin/python3 /home/..../manage.py sqs_reciever
これで30分おきにsqs_recieverコマンドが実行され、sqsからメッセージを受信できます。

 

以上、Django×Amazon SQSで非同期処理を行う方法を紹介しました。

ワーカーの部分は今回cronで行いましたが、SQSのキューをトリガーにLambdaで必要な処理を行うこともできます。

是非一度試しみてください。