kawasin73のブログ

技術記事とかいろんなことをかくブログです

SQS を使った Python の非同期ワーカーは ndkale しかない

誰一人見捨てない!!! どうも、かわしんです。Celery は見捨てるんです。

この記事は Pythonその2 Advent Calendar 2019 の 15 日目の記事です。

やや強めのタイトルですが、AWS SQS を使った非同期ワーカーでまともな実装は ndkale しかないという内容です。Celery は論外です。

github.com

前半はディスってばっかりなので、ndkale のことだけを知りたい場合は途中の「大本命 ndkale」から読んでください

前提としての欲しい機能

まず、諸々をディスる前に非同期ワーカーとして欲しい機能をあげておきます。

  • 正しく SQS を使って信頼性のあるタスク実行をする
  • 即時再実行をする
  • 複数のキューを使い分ける。また同じタスクでも動的に利用するキューを切り替えたい
  • Dead Letter Queue も使えると嬉しい

まず Celeryディスる

Python の非同期処理基盤として有名な OSS プロダクトとして Celery があります。

それなりにたくさんの人が使っていて、良さげな雰囲気があります。

Celery では Broker のうち RabbitMQRedisSQS の 3 つを Stable としてサポートしています。

Brokers — Celery 4.3.0 documentation

そこで SQS を Celery で使って試していました。

Celery のダメなところ

当たり前ですが、OSS を使うときはその中身のコードをざっと読んで本番環境で使える品質のものなのかを確かめます。

しかし、Celery は内部のコードが全く読めません 。どこが起動時のエンドポイントなのか、どこで Broker を切り分けているのか、コアとなる処理はどこに書いてあるのか、コードが難解すぎて全くわかりません。いざエントリーポイントを見つけてコードを追っていってもあっという間に呼び出す関数を見失ってしまいます。

ただ、たくさんの人が使っている信頼性があると思われるプロダクトだし、中のコードは読めないけど色々なケースを考慮して作られていることは読み取れたので Celery を使っていました。

Celery + SQS の致命的にダメなところ

Celeryデバッグログをだしながら使っていて気づいたことなのですが、Celery はプロセスの強制終了でメッセージをロスト します。これは信頼性のあるメッセージ伝達を行う SQS を使うワーカーとしては致命的です。もはや本番環境では使えない品質です。

まず、SQS はメッセージの At Least Once での伝達(FIFO キューの場合は Exactly Once)を実現するために以下の手順でメッセージの購読を行うことを想定しています。

  1. メッセージを SQS からタイムアウト付き(可視性タイムアウト)で 取得
  2. メッセージの内容でタスクを実行
  3. タスクが完了したら SQS からメッセージを 削除
  4. もしタスクの再実行が必要な場合は、タイムアウトによって自動的に復帰するのを待つか、タイムアウト0 に更新して即時復帰させる

タスクの実行が完了した後にメッセージを削除するので、タスク実行中にプロセスが突然終了した場合でもメッセージはタイムアウトの時間が経過後に復活して再実行されます。

もちろん、タスクの実行完了とメッセージの削除の間でプロセスが死ぬ場合もあるので、タスクは冪等に作る必要があります。

さて、Celeryデバッグログを観察する限り以下のような順序で処理をしているようでした。

  1. メッセージを SQS から 取得
  2. メッセージを 削除
  3. タスクを実行
  4. もしタスクの再実行が必要な場合は、新しいメッセージを SQS に送信

メッセージを削除するタイミングが早すぎます。もし、タスクの実行中にプロセスが突然終了した場合にはそのタスクは再実行されることなく、データのロストに繋がる可能性があります。

Celery を SQS で使う限りメッセージの信頼性はない です。

これはあくまでもデバッグログの順序を観察して僕が推測したことなので違うかもしれませんが、確かめようにも内部のコードが読めないので仕方ないです。

Celery は複数の Broker に対応しているため Broker のモデルとしての抽象化が難しく、妥協としてこのような実装になっていると思うので仕方ないとも言えます。が、少なくとも本番環境で使えるような品質ではないので Stable と表記するのはやめていただきたいです。

そのほかのライブラリをざっとディスる

さて、本命の ndkale にたどり着くまでにたくさんの SQS を使った非同期ワーカーライブラリを調査してその内部コードを読んだので、それぞれディスっていきます。もちろんそれぞれいいところはあるのですが、そこまで書くと長くなるので採用しなかった理由だけを書いていきます。

大本命 ndkale

さて、調査の結果 ndkale が実装の品質が高く、手を入れれば本番環境でも使えそうだったので利用しました。

engblog.nextdoor.com

概要はこのブログに記載されていますが、Celery で苦しんだ末に一から非同期ワーカーを実装されたようです。このブログ当時で 100万/day 単位のタスクを分散環境で処理している実績があります。

プレフィックスの nd は作成した会社の NextDoor からきているんだと思います。パッケージ名自体は kale です。2014 年に作られたライブラリで 5 年間運用されているようで、現在でもメンテナンスされているみたいです。

ndkale の最大の特徴は、優先度付きキュー に対応していることです。実現するために Lottery Algorithm を拡張した独自の Reduced Lottery というアルゴリズムでスタベーションなくメッセージを処理します。

もちろん、SQS を正しく使ってデータのロストがおこらないように実装しています(実際にはバグがありましたが、僕が修正 PR を投げました)。再実行についてはリトライ回数をインクリメントしたメッセージを再送信してからメッセージを消すようにしています。SQS のメッセージは Immutable なので再送信をしているようです。SQS が用意している Dead Letter Queue の仕組みは利用せず、独自に再実行回数を判別して Dead Letter Queue へ送信しています。

内部実装を読みましたが、設計と実装の品質がとても高い です。綺麗に書かれているのでスラスラ読めました。設計もよく考えられていて、利用者による機能の拡張がユーザーに露出するコンポーネントである「Task」と「Worker」を継承することで可能なような設計になっています。また、Task と Worker には様々なフックが用意されており、処理の注入もやりやすいです。

一方で、詳しいドキュメントがない という欠点もあります。簡単な使い方と ndkale の概念が README に書かれているだけで、手の込んだ使い方は自分で内部実装を読んで理解するしかありません。しかし、内部実装が品質が高くとてもわかりやすいので、読んでいるだけで 作者の気持ちがわかり、こうやって使えばいいんだなとか、ここを拡張すればいいんだな、みたいなことが読み取れます。読むと感動すると思います。すごい。

あと、仕様上の課題ですが、複数のキューに対応した時に メッセージがあるにも関わらず最大 20 秒間処理が止まってしまう 可能性があるという課題があります。 全てのキューにメッセージがない時にはランダムに選ばれたキューを Long Polling してメッセージを待ちます。しかし、その間に別のキューにメッセージがきても気づくことができず、Long Polling の最大時間である 20 秒間処理されない可能性があります。

この課題は頻繁にメッセージがくるような成熟したサービスであればそもそも Long Polling する時間も短く済むため問題にはなりませんが、全てのキューでメッセージの頻度が少ない場合に問題になります。解決策としては 複数キューでは使わずに単独のキューのみを監視するように使う 方法しかありません。最大の売りである優先度付きキューの機能は無視することになりますが、そのほかの単純なワーカーとしての品質が高いため ndkale を単独キューで利用します。

ndkale に足りないこと

ここまで大絶賛の ndkale ですが、残念ながらこのまま使うには不十分です。ただし、設計と実装の品質が高いため簡単に足りない機能を追加することができます。

単独キューを待って、複数のキューに送信する

Long Polling による別のキューのメッセージ遅延を防ぐためにワーカーは 1 つのキューのみに対してメッセージの監視をします。一方で、タスクの役割ごとに利用するキューは変えたいため、1 つのキューを監視して、複数のキューのうちのどれかに送信するということをしたいです。

残念ながら ndkale では、受信しているキューへ対してしか送信できないため、キューの設定ファイルを 2 つ用意して対応します。

ndkale では、キューの情報は QueueInfo のクラス変数に保持されています。内部実装を見る限り、受信用のキューは QueueInfo._queues から参照し、送信用のキューは QueueInfo._simple_name_queues_map から参照しているので、まず受信用のキューをローディングした後に、送信用のキューを QueueInfo._simple_name_queues_map に追加することで解決します。

from kale import settings, utils, worker
from kale.queue_info import QueueInfo

queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
# load queues for dequeue
QueueInfo(
    config_file=settings.QUEUE_CONFIG,
    sqs_talk=None,
    queue_cls=queue_class
)

# load queues for enqueue
queues = QueueInfo._get_queues_from_config(settings.ENQUEUE_CONFIG, queue_class)
for q in queues:
    if q.simple_name not in QueueInfo._simple_name_queues_map:
        QueueInfo._simple_name_queues_map[q.simple_name] = q

キューの動的な選択

タスクは kale.Task クラスを継承して定義します。送信するキューの名前は継承したタスククラスの queue というクラス変数に設定しますが、これで固定されてしまいます。

タスクの publish() で動的に送信するキューを変更したいのですが、単にクラス変数を上書きするだけでは別プロセスで実行されるワーカーでリトライ時に再送信されるキューがデフォルトのものに戻ってしまうため、うまくいきません。並行処理での問題もあります。

そこで、メッセージが用意している app_data にキューの名前を指定してそれをワーカーにも伝搬させる方針をとることにしました。キューの送信を行う Publisher を差し替えることで実現しています。Publisher の差し替えも Task の継承のオーバーライドでできるように設計されていてすごいです。

from kale import publisher, task

class TaskClass:
    def __init__(self, task_class, module=None, name=None, queue=None, time_limit=None):
        self.__module__ = module or task_class.__module__
        self.__name__ = name or task_class.__name__
        self.queue = queue or task_class.queue
        self.time_limit = time_limit or task_class.time_limit

class Publisher(publisher.Publisher):
    def publish(self, task_class, task_id, payload, current_retry_num=None, delay_sec=None):
        """
        Override original method to inject TaskClass
        """
        app_data = payload.get('app_data')
        if isinstance(app_data, dict):
            task_class = TaskClass(task_class, queue=app_data.get('queue'))
        super(Publisher, self).publish(task_class, task_id, payload, current_retry_num=current_retry_num,
                                       delay_sec=delay_sec)

task_publisher = None

class BaseTask(task.Task):
    @staticmethod
    def _get_publisher():
        """
        Override original class method to inject custom Publisher
        """
        global task_publisher
        if task_publisher is None:
            task_publisher = Publisher()
        return task_publisher


class SomeTask(BaseTask):
    pass

# dynamic queue selection for enqueue
SomeTask.publish({'queue': 'not_default_queue'}, msg)

再実行時刻を指定するリトライエラー

タスク実行時にエラーが発生した場合は Task._get_delay_sec_for_retry(current_retry_num) で計算される秒数待って再実行するようにして、exponential backoff に対応しています。

しかし、エラーに設定した時刻に再実行されるほうが扱いやすいので、retry_at を設定したエラーを送出することでその時刻に再実行するように DelaySeconds を設定して欲しいです。

残念ながら、ndkale はエラーの種類による動的なリトライ時間の計算は想定していなかったようで、ndkale 本体に手を入れなければこれは実現できません。

# kale.task.Task
    @classmethod
    def _get_delay_sec_for_retry_with_message(cls, message, raised_exception):
        return cls._get_delay_sec_for_retry_with_message(message.task_retry_num)

    def handle_failure(cls, message, raised_exception):
        # delay_sec = cls._get_delay_sec_for_retry(message.task_retry_num)
        delay_sec = cls._get_delay_sec_for_retry_with_message(message, raised_exception)

https://github.com/aces-inc/ndkale/commit/4787a8a0218fe575621bccc7752635b291e61c39#diff-9230566b060f8c0681ff4527bc9e483aR174

ndkale をこのように修正した上で以下のように Task を拡張して _get_delay_sec_for_retry_with_message() 関数をオーバーライドすればリトライ時刻の指定が可能になります。

class RetryError(Exception):
    def __init__(self, msg, retry_at=None):
        """
        retry_at must be set tzinfo
        """
        self.msg = msg
        self.retry_at = retry_at

class BaseTask(task.Task):
    @classmethod
    def _get_delay_sec_for_retry_with_message(cls, message, raised_exception):
        if isinstance(raised_exception, RetryError) and raised_exception.retry_at is not None:
            now = timezone.now()
            if raised_exception.retry_at < now:
                # retry immediately
                return 0
            diff = raised_exception.retry_at - now
            if diff.seconds > 900:
                # SQS max delay_time is 15 minutes (900 seconds)
                return 900
            else:
                seconds = diff.seconds
                if diff.microseconds > 0:
                    seconds += 1
                return seconds

        return super(BaseTask, cls)._get_delay_sec_for_retry_with_message(message, raised_exception)

retry_at0 の場合は即時再実行、None の場合は従来の exponential backoff でリトライします。また、SQS の DelaySeconds の最大値は 900 秒であるため、それより長い場合には 900 秒に切り詰めます。また秒以下の値は切り上げになります。

Django で使う

Django と組み合わせて使うためには、Django のコンテキストをうまく初期化して使う必要があります。Django の初期化をしつつ Worker を単独で起動する方法として、manage.py でコマンドとして起動する方法を採用します。

また、タスク内で Django のデータベースへのアクセスをする場合は、コネクションプールにあるデータベースへのコネクションを気にする必要があります。コネクションは、データベースに設定されたアイドル時間によるタイムアウトや大きすぎるサイズのクエリなどによって強制的に切断されてしまう場合があります。

これらの切断されたコネクションをそのまま使うと以下のようなエラーが発生します。

(2006, 'MySQL server has gone away')

これを解決するためには切断されたコネクションをコネクションプールから排除する必要がありますが、Django は自動でコネクションプールの死活管理をしてくれません。そのため、タスクの実行後に close_old_connections() を呼び出してコネクションプール内の切断されたコネクションを排除します。

Django のコマンドとして呼び出すために、<package_name>/management/commands/worker.py に以下のようなコマンドを定義します。

from __future__ import absolute_import, print_function

import logging

from django.core.management.base import BaseCommand
from django.db import close_old_connections
from kale import settings, utils, worker
from kale.queue_info import QueueInfo

logger = logging.getLogger(__name__)

class DjangoWorker(worker.Worker):
    def _on_task_succeeded(self, message, time_remaining_sec):
        super()._on_task_succeeded(message, time_remaining_sec)
        # cleanup db connection pool for each task
        close_old_connections()

    def _on_task_failed(self, message, time_remaining_sec, err, permanent_failure):
        super()._on_task_failed(message, time_remaining_sec, err, permanent_failure)
        # cleanup db connection pool for each task
        close_old_connections()

class Command(BaseCommand):
    help = 'worker'

    def handle(self, *args, **options):
        queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
        # load queues for dequeue
        QueueInfo(
            config_file=settings.QUEUE_CONFIG,
            sqs_talk=None,
            queue_cls=queue_class
        )

        # load queues for enqueue
        queues = QueueInfo._get_queues_from_config(settings.ENQUEUE_CONFIG, queue_class)
        for q in queues:
            if q.simple_name not in QueueInfo._simple_name_queues_map:
                QueueInfo._simple_name_queues_map[q.simple_name] = q

        logger.info('Task worker is running ...')
        DjangoWorker().run()

python manage.py worker と実行すればワーカーが起動されます。

テストで即時実行したい

非同期処理をテストすることはいつそのタスクが実行されるかを制御する必要があって難しいです。そのため、テストの時だけ非同期処理を同期的に実行したくなります。

残念ながら ndkale にはテスト時にだけ同期的に処理する仕組みはないですが、以下のように publish_now() クラスメソッドを Task に追加することで実現できます。

class BaseTask(task.Task):
    @classmethod
    def publish_now(cls, app_data, *args, **kwargs):
        tsk = cls({})
        tsk.run(*args, **kwargs)

テスト時には、publish() クラスメソッドを publish_now() に差し替えます。

class SampleTask(BaseTask):
    pass

from unittest import mock, TestCase

class SampleTests(TestCase):
    @mock.patch('SampleTask.publish', SampleTask.publish_now)
    def test_sample(self):
        SampleTask.publish()

バグを直す

ndkale の内部実装を眺めているうちにいくつかのバグに気づいたので本家のリポジトリにプルリクエストを送りました。いずれもいいレスポンスをもらえたのでそのうちマージされると思います。

github.com

メッセージの削除の順序が間違っていてデータロストする可能性があった課題を解決したプルリクエストです。

github.com

タスクの即時実行ができなくなってしまっている課題を解決したプルリクエストです。後方互換性のためにもう少し工夫する必要があるみたいですが、そのうち直します。

最後に

さて、かなり長くなってしまいましたが ndkale を紹介しました。

とても品質の高いプロダクトなので内部実装を読んで勉強してみるのもいいと思います。あと、Celery を SQS で使うのはやめることをおすすめします。