kawasin73のブログ

技術記事とかいろんなことをかくブログです

SIGSTOP シグナルは EINTR を発生させるのか

エラーハンドリングはヌケモレなく、どうもかわしんです。ちゃんとエラーハンドリングしてますか?

以前のブログでも書いた通り、プログラミングする以上発生しうるエラーのうち回復可能なエラーは必ずハンドリングするべきです。

kawasin73.hatenablog.com

で、system call のエラーハンドリングで忘れられがちなのが EINTR (Error INTerRupt) エラーです。

ブロッキングする ("slow" な) システムコール中にシグナルや ptrace が発生して処理が中断された時に返されるエラーで、大抵の場合エラーハンドリングとして同じシステムコールを再度呼び直すことになります。

いちいち EINTR の時だけ再度システムコールを呼びなおすのは面倒くさいので、SA_RESTART というフラグをシグナルハンドラに設定するとシグナルが送られた場合でも EINTR を返すことなく処理を継続してくれます。(ただし、再開不能な場合は EINTR が発生するらしいです。)

さて、シグナルの検証をしていたところ、SIGTERM などのシグナルを送ると確かに EINTR が発生するが、SIGSTOP を送って SIGCONT で再開した時には予想に反して EINTR が発生しないことがわかりました。今回はその謎に迫ります。

SIGSTOP とは

SIGSTOP とはプロセスを一時停止させるためのシグナルです。SIGCONT を送るとプロセスは再開します。

SIGSTOP は特別なシグナルで SIGKILL 同様、シグナルハンドラを設定することもシグナルを無視することもできません。そのため、SA_RESTART をつけるかつけないかを選ぶことはできないです。

カーネルのコードを読む

実際に SIGSTOP が他のシグナルと比べてどのように実装されているのかを Linux のコードを読んで調べます。Linux version は v5.19.17, アーキテクチャx86 を対象としてみます。

とりあえず、SA_RESTART がどのように実装されているかを確かめます。実は Linux のコードでは SA_RESTARThandle_signal() 関数の1ヶ所でしか使われていません。

SA_RESTART がついていない場合は EINTR を返すようにしてますが、そうでない場合は続行するようにしてるっぽいです。

static void
handle_signal(struct ksignal *ksig, struct pt_regs *regs)
{
    bool stepping, failed;
    struct fpu *fpu = &current->thread.fpu;

    if (v8086_mode(regs))
        save_v86_state((struct kernel_vm86_regs *) regs, VM86_SIGNAL);

    /* Are we from a system call? */
    if (syscall_get_nr(current, regs) != -1) {
        /* If so, check system call restarting.. */
        switch (syscall_get_error(current, regs)) {
        case -ERESTART_RESTARTBLOCK:
        case -ERESTARTNOHAND:
            regs->ax = -EINTR;
            break;

        case -ERESTARTSYS:
            if (!(ksig->ka.sa.sa_flags & SA_RESTART)) {
                regs->ax = -EINTR;
                break;
            }
            fallthrough;
        case -ERESTARTNOINTR:
            regs->ax = regs->orig_ax;
            regs->ip -= 2;
            break;
        }
    }

https://elixir.bootlin.com/linux/v5.19.17/source/arch/x86/kernel/signal.c#L805

handle_signal()arch_do_signal_or_restart() から呼び出されています。get_signal() から non-zero な値が返ってきた時に handle_signal() を呼び出し、それ以外の場合は handle_signal()-ERESTARTNOINTR のような処理を行いシステムコールを続行させるみたいです。get_signal()シグナルハンドラが設定されている時に non-zero な値を返す ことになっています。そのため、SIGSTOPSA_RESTART を設定されている時と同じような挙動をする みたいです。

void arch_do_signal_or_restart(struct pt_regs *regs)
{
    struct ksignal ksig;

    if (get_signal(&ksig)) {
        /* Whee! Actually deliver the signal.  */
        handle_signal(&ksig, regs);
        return;
    }

    /* Did we come from a system call? */
    if (syscall_get_nr(current, regs) != -1) {
        /* Restart the system call - no handlers present */
        switch (syscall_get_error(current, regs)) {
        case -ERESTARTNOHAND:
        case -ERESTARTSYS:
        case -ERESTARTNOINTR:
            regs->ax = regs->orig_ax;
            regs->ip -= 2;
            break;

        case -ERESTART_RESTARTBLOCK:
            regs->ax = get_nr_restart_syscall(regs);
            regs->ip -= 2;
            break;
        }
    }

    /*
     * If there's no signal to deliver, we just put the saved sigmask
     * back.
     */
    restore_saved_sigmask();
}

https://elixir.bootlin.com/linux/v5.19.17/source/arch/x86/kernel/signal.c#L865

本を読む

コードを読んで、SIGSTOPSA_RESTART を設定されている時と同じような挙動をすることがわかりました。ただ、これは Linux v5.17.19 の x86 だけの挙動で他の場合はどうかはわかりません。そこで仕様としてどうなっているのかを調べます。

ただ、Linux の仕様は文書としてはないような気がするので本を読みました。

詳解 UNIX プログラミング (第3版)

"Advanced Programming in the UNIX Environment" (Third Edition) の日本語版です。

"10.5 割り込まれたシステムコール" にはおまけみたいな文体で、signal() 関数でシグナルハンドラを確立したときにシステムコールを再開するかどうかは UNIX 実装によって違い、System V ではデフォルトではシステムコールを再開せず、Linux では再開すると書いてありました。つまり、シグナルに対するデフォルトの挙動は Linux では SA_RESTART ということみたいです。

LINUX プログラミング インタフェース

"The Linux Programming Interface" の日本語版です。

"21.5 システムコールへの割り込みと再開" では 2 つの重要なことが書いてありました。

  • いくつかのシステムコールでは SA_RESTART をつけていても再開できず EINTR を返す場合がある
  • SIGSTOP で停止した後に SIGCONT で再開した時にいくつかのシステムコールでは EINTR を発生させる。
    • epoll_pwait()
    • epoll_wait()
    • inotify file descriptor に対する read(), semop(), semtimedop(), sigtimedwait(), sigwaitinfo()

1つ目については、確かに handle_signal() 内の ERESTART_RESTARTBLOCKERESTARTNOHAND の場合に EINTR を設定していたので確認できました。

2つ目については、Linux のコードをざっと最初に眺めただけでは知らなかったので重要な情報でした。SIGSTOPRA_RESTART を有効にした場合と同じような挙動をするものの、例外もあるということでした。

まとめ

SIGSTOPRA_RESTART を有効にした場合と同じような挙動をするものの、例外もあるということでした。頑張って正しく EINTR をハンドリングしましょう。

OS はキャッシュのライトスルーを選ぶのか

メモリとキャッシュと整合性。どうも、かわしんです。

最近「詳解 LINUX カーネル」という分厚い本を読んでいるのですが、割と序盤の第 2 章で気になったことがあったので調べてみました。

ライトスルーとライトバック

ご存知の通り、CPU にはキャッシュがあってメモリアクセスの遅延を高速化しています。 メモリの内容への書き込みではキャッシュとメモリを同時に書き換える「ライトスルー」方式とキャッシュの中身だけ書き換えてメモリへの書き込みは遅延させる「ライトバック」方式があります。

ライトスルーは書き込みのたびにメモリアクセスが発生し遅くなってしまうが簡単に実装できるし、複数段キャッシュの L1 キャッシュで使えばその書き込み遅延も軽減することができます。 一方で、ライトバック方式は高速に書き込みができるけど整合性を保つために実装が複雑になりがちだし、バグが起きやすいかもしれません。

とここまでは Wikipedia にも載っている話です。

ja.wikipedia.org

世の中の文献ではこの二つの方式を比較して両方には一長一短があるよねという話を CPU を作るハードウェアベンダの視点から論じています。

OS がライトスルー方式を選ぶ理由はあるのか

ところが、「詳解 LINUX カーネル」によるとページテーブルエントリには PWT (Page Write-Through) フラグがあり、Pentium では OS がライトスルー方式かライトバック方式かを選ぶことができるらしいです。

僕の認識ではメモリのキャッシュは透過的なモノです。ライトバック方式のデメリットは CPU の実装上のデメリットであり、CPU の利用者である OS の立場からは、ライトバック方式のデメリットはなく高速なライトバック方式を採用しない理由がないです。 実際に Linux では全てのページフレームでキャッシュは有効であり書き込みの際には常にライトバック方式が設定されるそうです。

では逆に、なぜPentium などの CPU はライトバック方式のみではなくライトスルー方式も選択可能な形で実装しているのか、OS がわざわざライトスルー方式を採用するケースがあるのかどうかが気になります。

ということでざっくり調べてみましたが、軽く調べてみた限り CPU を作る立場から比較する記事が多かったです。

論点

結論

はっきりとはわからなかったですが、いい線いってるんじゃないかなと思います。詳しい人いたら教えてください。

新しい技術を追わない

時代はワークライフバランス。どうもかわしんです。

新しい技術は追わない。これは僕の個人的な方針です。別に最先端を追い続けることを否定するわけではありません。

ここでいう新しい技術は、web 界隈の新しいフレームワークとかライブラリとか SaaS とかです。

新しい技術を追い続けるのって大変

休日も時間を費やして勉強するのって大変ですよね。若いうちは Twitter で流れてくるいろんな技術記事を片っ端から貯めて通勤・通学時間に読んでましたが、もう 26 歳になって体力も時間も限られてきました。今はたまった記事も読まなくなったしそもそも興味を惹かれる記事が流れてこなくなって貯まらなくなりました。

必要ないものにモチベーションが湧かない

今勤めてる会社は社内ツールとか社内フレームワーク、ライブラリが溢れているガラパゴスな環境だから、勉強するモチベーションが湧かないです。使わないので。

あと、僕の興味が新陳代謝の激しい User facing な分野から Engineer facing な分野に移ってきたこともあります。

stable なものが好き

数年で陳腐化してしまうものを勉強するのってコスパ悪くないですか?

数年の時を経て淘汰された末に洗練された枯れた技術の方が好きです。そういう技術はドキュメントも整備されてるし、落とし穴も先人が踏み尽くしてくれてるので効率がいいような気がします。知の高速道路をダッシュするみたいな?

あと自分の作るものも数年で陳腐化するようなものを永遠にメンテし続けたくないので、安定したものに依存したいです。一度作ったらほったらかしにできるのが理想。

必要になっても勉強すればいい

新しい技術を追わない、時代においていかれるからといって不安は特にないです。

必要になったらドキュメント読んで勉強すればいいし、それで追いつけるくらいの自信はあります。

ただ、ちょっとアドバイスを求められた時とかに 1 から調べて答えないといけないので、その辺りの初速がガタ落ちするのが大きなデメリットです。やっぱりやるならやるで業界のデファクトに則ってやりたいし、やってほしい。

とはいってもなんだかんだ新しい技術は仕入れちゃうよね

追いはしないですけど、一応知りはします。ざっとホームページとか見てふむふむこんなものね、みたいな。技術の本質を理解しておけば、新しい技術とはいってもその応用なので。

新卒で入社して1年が経った

お久しぶりです。どうも、かわしんです。前回の投稿から丸々1年が経ったのでブログを投稿したいと思います。

1 年間何をしてたのか

1 年前、新卒として某外資系 IT 企業にソフトウェアエンジニアとして就職し、それから某 Android アプリを開発するソフトウェアエンジニアとして黙々と仕事をしていました。

3 年ほど前にアプリケーションエンジニアから離れることを決心して努力を重ねてきましたが、チーム配属の結果アプリケーションエンジニアとして戻って来ることになりました。僕は Android アプリエンジニアとしてエンジニアのキャリアを始めているので 5 年ぶりに原点に戻ってきた形です。正直チームが決まった時は就職せずに自分のやりたい方向を貫くかどうかかなり迷いましたが、社内でのチーム異動の文化があること、英語の環境で働く経験をしてみたかったこと、大企業で働いてみたかったこと、チーム異動によっては外国で働くことができる可能性、充実した福利厚生に惹かれてそのまま就職することにしました。就職して 1 年経ちましたがなんとかクビになることもなく楽しく仕事をできています。

就職するにあたり、1 年間勉強を止めることにしました。思い返せば 7 年前の大学 2 年生の時に起業してから休日も勉強か仕事をしていたのでここで少し休憩するのもいいかなと思ったのが理由です。新しい環境で成果を出すのに集中したいと思ったのもあります。このブログの更新もそれに伴って止まっていました。僕の github ページ の草たちが 2020 年の 4 月を境にパタリと止んでしまっているのは我ながらあからさまだなと思ったりしています。業務の内容が反映されていないのもありますが、土日は毎日遊んでいました。

大きな会社で働く違い

入社してからの大きな違いは、巨大なコードベースの全てを理解することなく変更を加えていくことです。今までの開発は全て自分 1 人か少人数で 1 から作りあげるタイプの開発だったのでそのプロダクトのコードは全て頭に入った状態で開発していましたが、僕が担当するアプリは歴史のあるアプリでそこそこ巨大でたくさんのエンジニアが同時に開発をしているアプリです。最初から全てを理解することは到底不可能で、必要な情報をコードベースの中から探し出して機能を加えていくのは初めてで新しい経験になりました。

暇になった

別の大きな変化として、突然暇になったことがあります。今までは常に仕事として実装する機能や勉強したいことがあったので時間があれば勉強か仕事をしていました。しかし、ワークライフバランスを重視する会社に入ったことで下っ端の僕がやる量は毎日時間内に終わるように管理されており、逆に長く働くことは推奨されません。また勉強することもないので突然夕方からやることがなくなり、暇になったことに最初は困惑していました。最初の方は本を読んだりゲームをしてました。(PUGB mobile とか world of warships blitz とか)。あと youtube を習慣的に観るようになりました。8 月の終わりに引っ越してからはバルコニーが広かったので友達を呼んで少人数でパーティをよくしてました。

勉強をしなくなった理由は、意識的に勉強をしないと決めたこともありますが、勉強するモチベーションがなくなったこともあります。僕の今までの勉強や開発のモチベーションは、僕が必要な物を作ることでした。僕の実生活であったら便利だなとか仕事のプロダクトです。そのプロダクトを作るにあたり必要なライブラリとかを開発していましたが、そのモチベーションは "必要だったから" です。htask管理上手のうさちゃん に必要だったからですし、わざわざ自宅 Kubernetes をやって wanpoll を作ったのも、うさちゃんをコスト低く運用する為でした。 Twilter も自分が楽に twitter を監視する為に作りました。仕事では、ネストした jsoncsv に変換する nested_csv を作ったりしてました。しかし、案外それ以外にもシンプルに興味があったから作ってみたもの (e.g. Cコンパイラとか OS とかトランザクションとか) もありますが。

ぶっちゃけ今の時点でこれ欲しいなって物はないので、これからは自分の興味のある勉強を再開していこうと思ってます。

英語

外資系の会社なので社内のコミュニケーションは全て英語です。またコロナのせいでリモートで仕事してるので日本人同士の雑談みたいなのもほとんどなく全部英語です。自分の入社前の英語スキルは自己紹介をカンペを見ながらやるレベルでとてもきつかったのですが、半年くらいしたらなんとかカンペなしで相手に伝えられる程度には成長できました。とはいえ、日常会話はまだまだ不自由で談笑できるまでではないです。個人的には単語や熟語の知識が欠落しているので単語の勉強を強化していきたいです。

最初は聞き取るのが絶望的にできなくて Google Meets の captions 機能がなかったら今まで会社に残れたかどうか怪しいです。今でも聞き取るのは苦手です。でも会社の人は優しいので聞き直してもゆっくり言ってくれますしなんとかなってます。

まとめ

なんだかまとまりのない文章になりましたが、言いたかったのはこの2つです。

  • この 1 年休憩してました
  • これから頑張ります

よろしくお願いします。なんか facebook に投稿してそうな内容だな。

原因不明のエラーなどない

お前のいう想定外はただの調査不足。どうも、かわしんです。新年度が始まり周りのみんなは働き始めているのを感じますが、僕は4月6日入社なのでもう少しモラトリアムを満喫します。

さて、去年開催された GoCon 2019 Spring で「エラー設計について / Designing Errors」という発表がありました。

docs.google.com

この発表ではエラーを「既知のエラー」「未知のエラー」と分類し、アプリケーションを動かしながら発生した「未知のエラー」に対するハンドリングを追加しながらどんどん「既知のエラー」に変換していくことで、ハンドリングされないエラーを減らしていこうというアプローチを紹介していたものと僕は認識しています。

しかし、具体的にエラーとはどこから発生するのかという説明がなかったため、いつどこから発生するかわからないエラーと戦い続ける必要があるようにも誤解される方がいるかもしれないと思ったのでこの記事を書くことにしました。

結論からいうと、全てのエラーは事前に予測可能であり恐れる必要はありません

前提

ここでは OS の上で動くソフトウェアのプログラミングを対象として扱います。C 言語とか Go とかの高級言語でのプログラミングです。

カーネルの中とか FPGA などのハードウェアプログラミングは対象としませんが本質は同じではないかなと思っています。

エラーの実態

エラーはプログラミング言語や書き手によって表現方法が違いますが、大抵の言語では「返り値」「例外」「シグナル」のどれかまたは複数で表現されます。どれを使うのがいいかということについては色々な考え方があると思うのでこの記事では触れません。

エラーはどこで発生するのか

我々がプログラミングをする上で扱うエラーの発生源は大きく以下の3つに分類できます。

  • 自分で定義して作り出すエラー
  • 関数を呼び出した時に発生するエラー
  • その他の実行時エラー

「自分で定義して作り出すエラー」は内部状態に対して入力値が異常である場合に処理を中断して発生させるエラーです。例えば、入力値のバリデーションエラーであったり、残高がないのにお金を引き出そうとしていた時のエラーなどがわかりやすいと思います。

「関数を呼び出した時に発生するエラー」は呼び出した関数の中で定義されて作り出されたエラーであったり、その関数がさらに別の関数を呼び出して発生したエラーであったりします。これらエラーはそのまま呼び出し元に返されることもありますし、別のエラーに変換されて返されることもあります。

さらに関数が呼び出した関数によるエラーを突き詰めて辿っていくと、全て「関数の作者が定義して作り出すエラー」と「システムコールから発生するエラー」にたどり着きます。システムコールは OS 上で動くソフトウェアとカーネルの境界をつないでいる命令で、 C 言語や Go などの大抵のプログラミング言語では関数の形で定義され、関数呼び出しと同じように扱うことができます。

システムコールがわからない人はコンピュータサイエンスの基本なので調べてみてください。簡単に説明すると、普通のプログラマが書くプログラムは「ユーザランド」という安全な領域で動き基本的にはメモリ演算しかできず、ネットワーク通信やファイル I/O、ディスプレイの出力、ハードウェア機器の操作などは悪いプログラムに乗っ取られるのを防ぐために全て「カーネルランド」という隔離された領域でしか実行できません。しかし、ユーザランドでしか動かないソフトウェアは何もできないので、ユーザランドは「システムコール」を発行してカーネルランドに処理の実行を依頼するという形になっています。

「その他の実行時エラー」は、不正な CPU 命令や不正なメモリアクセスを想定しています。これらが発生した場合は CPU 例外が発生し OS で定義された例外ルーチンが実行されアプリケーションは停止されたりシグナルが飛んできたりします。(詳しくないので間違っていたらすみません。)しかし、これらのエラーは大抵のメモリ安全な高級言語では仕組み上そもそも発生しえないものです。逆に発生するとデバッグは大変です。

また、ゼロ除算も実行時エラーです。言語によってはゼロ除算をした時の挙動は未定義だったりしますが言語によってはゼロ除算をするときにエラーを発生させてくれます。

全てのエラーは定義されている

さて、エラーの発生源を辿っていくと全て自分か誰か(ライブラリの作者など)が定義したエラーかシステムコールから発生するエラーにたどり着きました。

自分か誰か(ライブラリの作者など)が定義したエラーは自明に定義されていますが、システムコールから発生するエラーも全て定義されています。

例えば外部とのネットワーク通信で使うソケットの読み取り命令 recv を見てみましょう。recv man page とググれば以下の Linux のマニュアルが出てきます。または、ターミナルで man 2 recv と実行すると実行した OS のマニュアルが読めるはずです。

https://linuxjm.osdn.jp/html/LDP_man-pages/man2/recv.2.html

ここにはエラーの章があり、全ての発生しうるエラー(EAGAIN EWOULDBLOCK EBADF ECONNREFUSED EFAULT EINTR EINVAL ENOMEM ENOTCONN ENOTSOCK)とそのエラーがどのような時に発生するのかが定義されています。逆に言えばここに定義されていないエラーは絶対に発生しません。

ここまでで、原因不明のエラーなど存在せず全てのエラーが定義されているということがわかったと思います。

エラーハンドリング

さて、(理想的には)全ての関数から発生するエラーは全て定義されているのでそれに対するエラーハンドリングをしていけばいいということになります。

ここでエラーは大きく2つに分類されます。「ハンドリングする(回復可能な)エラー」か「ハンドリングしない(回復不能な)エラー」です。全て定義されているのでそこに想定外のエラーなどありません。

その関数のレイヤーで回復可能な場合はエラーハンドリングをして処理を続行し、そのレイヤーで回復不能な場合はそのエラーを呼び出し元に返し上のレイヤーでエラーハンドリングされることを期待します。

発生したエラーを全て上のレイヤーに戻していればプロセスは異常終了してしまいます。要件の厳しくないアプリケーションサーバなどであれば異常終了させて再起動して後でツギハギの対応をすればいいかもしれませんが、データベースサーバなど一度起動したら数年単位で落ちることが許されないソフトウェアであれば実装時に1つ1つのエラーを精査して回復可能なエラーを拾っていきます。想定外のエラーはただの調査不足です。

しかし現実

しかし、現実はそんなに簡単ではありません。ライブラリから発生するエラーがドキュメントなどで明確に定義されていない場合があります。その場合は冒頭で紹介したような「未知のエラー」を「既知のエラー」に変換していくアプローチが有効だと思います。

一方でもう1つ解決策があります。それは「ライブラリを使わない」ことです。ライブラリを追加すれば追加するほど自分が理解していないコードが増え、把握できていないエラーが発生する可能性が高まります。関数ごとのエラーが定義されていないライブラリは使わずに自分でシステムコールの上にエラー定義されたライブラリを築き上げていくのも有効な解決策だと思います。大変ですが。

言いたかったこと

  • 関数を作るときは発生し得るエラーを定義しろ
  • 全てのエラーを精査して、必要なハンドリングをしろ
  • エラーは全て定義されていて予測可能である。恐れる必要はない

番外編:境界をずらす

エラーの発生源は境界ごとに定義されていることで想定外のエラーのないソフトウェアを作ることができます。アプリケーションとライブラリとの境界(ライブラリ関数呼び出し)、ユーザランドカーネルランドの境界(システムコール)を今回の記事では紹介しました。システムコールについては完全にエラーが定義されドキュメント化されています。

さらに、システムコールの実装の中身をカーネルの中に辿っていくと同様に「カーネルの中で定義されたエラー」と「ハードウェアから発生するエラー」にたどり着きます。ハードウェアから発生するエラーはソフトウェアとハードウェアの境界で定義されたエラーです。

このように境界ごとに明確にエラーを定義することで想定外のないソフトウェアが実現可能になります。

番外編:ハードウェア障害

この記事では暗黙の前提として、ハードウェアが正しく動くことを前提としていました。しかし、ハードウェアも壊れることがあります。メモリが破壊されたり、ストレージが破壊されたり。これらの障害は OS のレイヤーで検知されてエラーとして発生することもありますが、アプリケーションが管理している領域のメモリの値が変わってしまったり、1 + 1 を 3 と CPU が計算してしまったような場合には対応が難しいです。エラーハンドリングの処理自体が正しく動くかわかりません。僕は、普通のソフトウェアを作っている場合はこのような状況はどうしようもないので異常終了するべきだと思いますが、世の中にはこのようなコンピュータが正しく動かないことも想定したソフトウェアの世界があるのかもしれません。(ないのかもしれません)

失敗した AWS Batch ジョブを Slack に通知する (Terraform を使って)

サーバーレスでピタゴラスイッチ。どうも、かわしんです。イベントをサーバレスで繋げてピタゴラスイッチを作るのって案外楽しいもんですね、GUI コンソールで作ってる限りは。

さて、今回は AWS Batch のジョブ実行が失敗した時に Slack に通知する機能を作りたかったのですが、断片的な記事しか見当たらなかったのでこの記事でまとめようと思います。また、今回はインフラ構築ツールとして Terraform を使います。

多分、断片的な記事を普通に繋げてると動かないハマりポイントがあるので、後学の為に注意喚起するという目的もあります。

全体のアーキテクチャ

全体の流れはこんな感じでイベントを繋げていきたいと思います。

Batch -> CloudWatch -> SNS -> Lambda -> Slack

AWS Batch ではジョブの状態が変わるたびにイベントが発生します。 CloudWatch Event Rule を設定してイベントの中から FAILED になったイベントのみをフィルタリングして SNS Topic に流します。

AWS SNS はイベントを受け取ったら Lambda を起動して、Lambda に登録した Node のコードがイベントを整形して Slack に Webhook を叩いて通知するという流れです。

CloudWatch から Lambda を直接起動することもできますが、SNS を経由することで Slack の他にもメールなどの他のチャネルへの通知を拡張することができる為、SNS を経由することにしました。

今回は、この CloudWatch と SNS と Lambda を Terraform を使ってセットアップしていきます。

Lambda で動かすパッケージ

さて、CloudWatch のイベントを Slack へ通知する Lambda Function は標準では用意されていません。イベントの JSON を整形して Slack のコメントとして Webhook を叩く処理を実装する必要がありますが、一から書くのは面倒です。

そこで、aws-to-slack というツールスタックを利用することにしました。

github.com

aws-to-slack は CloudWatch や Lambda のセットアップまでを含めて make deploy を使って簡単に完了させることができるツールスタックです。しかし、AWS リソースのセットアップには CloudFormation を使っている為 Terraform との相性は悪く、勝手に AWS リソースを作られるのも気に入りません。

make package コマンドを実行すれば Lambda で実行する Node のパッケージがコンパイルされる為、Lambda 上で動かすコードだけを aws-to-slack では利用することにしました。

事前に以下のコマンドを実行してコンパイル結果である release.zip を生成します。

$ git clone https://github.com/arabold/aws-to-slack.git
$ cd aws-to-slack
# 生成物 release.zip ができる
$ make package

aws-to-slack ディレクトリに生成された release.zip を任意の S3 バケットにアップロードしておきます。

構築する Terraform

S3

S3 にアップロードした aws-to-slack の release.zip を参照する為に data.aws_s3_bucket_object.lambda_to_slack を作ります。

/*
 * https://www.terraform.io/docs/providers/aws/d/s3_bucket_object.html
 */
data "aws_s3_bucket_object" "lambda_to_slack" {
  bucket = "<bucket_name>"
  key    = "<key_prefix>/release.zip"
}

IAM Role

aws-to-slack の CloudFormation で指定されているような IAM ロールを作ります。

/*
 * https://www.terraform.io/docs/providers/aws/r/iam_role.html
 */
resource "aws_iam_role" "iam_for_lambda" {
  name = "iam-for-lambda"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

/*
 * https://www.terraform.io/docs/providers/aws/r/iam_role_policy_attachment.html
 */
// https://github.com/arabold/aws-to-slack/blob/1451c1beae7b8f635c42161587bddbce04442857/cloudformation.yaml#L68-L70
resource "aws_iam_role_policy_attachment" "iam_for_lambda1" {
  role       = aws_iam_role.iam_for_lambda.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy_attachment" "iam_for_lambda2" {
  role       = aws_iam_role.iam_for_lambda.name
  policy_arn = "arn:aws:iam::aws:policy/CloudWatchReadOnlyAccess"
}

resource "aws_iam_role_policy_attachment" "iam_for_lambda3" {
  role       = aws_iam_role.iam_for_lambda.name
  policy_arn = "arn:aws:iam::aws:policy/AWSCodeCommitReadOnly"
}

Lambda

Lambda のセットアップをします。Lambda Function の具体的な設定は aws-to-slack の CloudFormation を参考にしています。

Slack の Webhook URL は https://my.slack.com/apps/manage/custom-integrations にアクセスして生成します。

/*
 * https://www.terraform.io/docs/providers/aws/r/lambda_function.html
 */
// https://github.com/arabold/aws-to-slack/blob/1451c1beae7b8f635c42161587bddbce04442857/cloudformation.yaml#L46-L63
resource "aws_lambda_function" "lambda_to_slack" {
  s3_bucket         = data.aws_s3_bucket_object.lambda_to_slack.bucket
  s3_key            = data.aws_s3_bucket_object.lambda_to_slack.key
  s3_object_version = data.aws_s3_bucket_object.lambda_to_slack.version_id
  function_name     = "lambda-to-slack"
  handler           = "src/index.handler"
  role              = aws_iam_role.iam_for_lambda.arn
  memory_size       = 256
  runtime           = "nodejs10.x"
  # Cross-region metrics lookup requires at least 10s
  timeout = 15
  environment {
    variables = {
      SLACK_CHANNEL  = "<channel_name>"
      SLACK_HOOK_URL = "https://hooks.slack.com/<hook_api_path>"
    }
  }
}

/*
 * https://www.terraform.io/docs/providers/aws/r/lambda_permission.html
 */
resource "aws_lambda_permission" "sns_to_lambda_to_slack" {
  statement_id  = "lambda-to-slack"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.lambda_to_slack.function_name
  principal     = "sns.amazonaws.com"
  source_arn    = aws_sns_topic.cloudwatch_slack.arn
}

CloudWatch

resource.aws_cloudwatch_event_rule.batch_failed_job を設定して Batch から発生するイベントをフィルタリングします。これは以下の AWS 公式のチュートリアルを参考にしました。

参考 : チュートリアル: 失敗したジョブイベントに関する Amazon Simple Notification Service アラートを送信する

/*
 * https://www.terraform.io/docs/providers/aws/r/cloudwatch_event_rule.html
 */
resource "aws_cloudwatch_event_rule" "batch_failed_job" {
  name = "batch-failed-job"

  event_pattern = <<PATTERN
{
  "detail-type": [
    "Batch Job State Change"
  ],
  "source": [
    "aws.batch"
  ],
  "detail": {
    "status": [
      "FAILED"
    ]
  }
}
PATTERN
}

/*
 * https://www.terraform.io/docs/providers/aws/r/cloudwatch_event_target.html
 */
resource "aws_cloudwatch_event_target" "sns_slack" {
  rule = aws_cloudwatch_event_rule.batch_failed_job.name
  arn  = aws_sns_topic.cloudwatch_slack.arn
}

SNS

SNS の設定はこんな感じになります。resource.aws_sns_topic.cloudwatch_slack.policy については後述します。

/*
 * https://www.terraform.io/docs/providers/aws/r/sns_topic.html
 */
resource "aws_sns_topic" "cloudwatch_slack" {
  name = "cloudwatch-to-slack"

  // https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/resource-based-policies-cwe.html#sns-permissions
  policy = <<POLICY
{
  "Version": "2012-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__default_statement_ID",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "SNS:GetTopicAttributes",
        "SNS:SetTopicAttributes",
        "SNS:AddPermission",
        "SNS:RemovePermission",
        "SNS:DeleteTopic",
        "SNS:Subscribe",
        "SNS:ListSubscriptionsByTopic",
        "SNS:Publish",
        "SNS:Receive"
      ],
      "Resource": "arn:aws:sns:<region_name>:<account_id>:cloudwatch-to-slack",
      "Condition": {
        "StringEquals": {
          "AWS:SourceOwner": "<account_id>"
        }
      }
    },
    {
        "Sid": "TrustCWEToPublishEventsToMyTopic",
        "Effect": "Allow",
        "Principal": {
            "Service": "events.amazonaws.com"
        },
        "Action": "sns:Publish",
        "Resource": "arn:aws:sns:<region_name>:<account_id>:cloudwatch-to-slack"
    }
  ]
}
POLICY
}

/*
 * https://www.terraform.io/docs/providers/aws/r/sns_topic_subscription.html
 */
resource "aws_sns_topic_subscription" "cloudwatch_slack" {
  topic_arn = aws_sns_topic.cloudwatch_slack.arn
  protocol  = "lambda"
  endpoint  = aws_lambda_function.lambda_to_slack.arn
}

ハマりどころ:SNS トピックの Policy

さて、最初の時点では aws_sns_topicpolicy は指定していませんでした。その状態で試してみると、CloudWatch Event Rule から SNS Topic への送信に失敗します。

さらに、AWSGUI コンソールから CloudWatch Event Rule を作ると Terraform で作ったルールと主導で作ったルールの両方でイベントの送信に成功するようになります。そのあとに主導で作ったものを削除して Terraform で作ったものだけに戻すと失敗するようになります。

ここで Terraform の aws_cloudwatch_event_target のドキュメントをよく読むと以下のように注意書きがされています。

Note: In order to be able to have your AWS Lambda function or SNS topic invoked by a CloudWatch Events rule, you must setup the right permissions using aws_lambda_permission or aws_sns_topic.policy . More info here .

https://www.terraform.io/docs/providers/aws/r/cloudwatch_event_target.html

つまり、SNS トピックに送信する場合には aws_sns_topicpolicy を指定する必要があるということです。しかし、具体的に何を書けばいいのかは教えてくれず無造作に AWS のドキュメント へのリンクが置かれているだけです。

このドキュメントを読むと、一度 SNS トピックを作った後でその Policy に以下の Policy を追加するように書かれていました。

{
  "Sid": "TrustCWEToPublishEventsToMyTopic",
  "Effect": "Allow",
  "Principal": {
    "Service": "events.amazonaws.com"
  },
  "Action": "sns:Publish",
  "Resource": "arn:aws:sns:region:account-id:topic-name"
}

そこで実際に作成した SNS トピックの Policy を調べて前述の resource.aws_sns_topic.cloudwatch_slack.policy が完成しました。

しかし、この Policy の中には SNS トピックの ARN が含まれています。Terraform のように1度のオペレーションで Immutable にリソースを作成する場合は事前にリソースの ARN を参照することは自己参照になりできません。

つまり、これを綺麗に Terraform で実現することはできないのです。だから、Terraform のドキュメントでは具体的な手法を説明するのではなく無造作にリンクが置かれているだけなのだと理解しました。不親切ですが。

一方で、アカウント名と SNS トピックの名前がわかっている場合はリソースの生成前に ARN を予測することができるため、今回は直接指定して事なきを得ました。

最後に

これで AWS Batch の失敗ジョブを Slack に通知することができるようになりました。初めてサーバーレスというものを使ってみましたが案外面白かったです。

ただ、GUI だと裏側でよしなにやっているごにょごにょした部分を自分で実装する必要があり Terraform などを使うと案外めんどくさいことがわかりました。

AWS Batch はジョブ起動のレイテンシーを考えない場合はコスト低くできる(低いとはいってない)方法なので使ってみてはいかがでしょうか?(AWS Batch ジョブの SSM 連携がないのでハマりました)

SQS を使った Python の非同期ワーカーは ndkale しかない

誰一人見捨てない!!! どうも、かわしんです。Celery は見捨てるんです。

この記事は Pythonその2 Advent Calendar 2019 の 15 日目の記事です。

やや強めのタイトルですが、AWS SQS を使った非同期ワーカーでまともな実装は ndkale しかないという内容です。Celery は論外です。

github.com

前半はディスってばっかりなので、ndkale のことだけを知りたい場合は途中の「大本命 ndkale」から読んでください

前提としての欲しい機能

まず、諸々をディスる前に非同期ワーカーとして欲しい機能をあげておきます。

  • 正しく SQS を使って信頼性のあるタスク実行をする
  • 即時再実行をする
  • 複数のキューを使い分ける。また同じタスクでも動的に利用するキューを切り替えたい
  • Dead Letter Queue も使えると嬉しい

まず Celeryディスる

Python の非同期処理基盤として有名な OSS プロダクトとして Celery があります。

それなりにたくさんの人が使っていて、良さげな雰囲気があります。

Celery では Broker のうち RabbitMQRedisSQS の 3 つを Stable としてサポートしています。

Brokers — Celery 4.3.0 documentation

そこで SQS を Celery で使って試していました。

Celery のダメなところ

当たり前ですが、OSS を使うときはその中身のコードをざっと読んで本番環境で使える品質のものなのかを確かめます。

しかし、Celery は内部のコードが全く読めません 。どこが起動時のエンドポイントなのか、どこで Broker を切り分けているのか、コアとなる処理はどこに書いてあるのか、コードが難解すぎて全くわかりません。いざエントリーポイントを見つけてコードを追っていってもあっという間に呼び出す関数を見失ってしまいます。

ただ、たくさんの人が使っている信頼性があると思われるプロダクトだし、中のコードは読めないけど色々なケースを考慮して作られていることは読み取れたので Celery を使っていました。

Celery + SQS の致命的にダメなところ

Celeryデバッグログをだしながら使っていて気づいたことなのですが、Celery はプロセスの強制終了でメッセージをロスト します。これは信頼性のあるメッセージ伝達を行う SQS を使うワーカーとしては致命的です。もはや本番環境では使えない品質です。

まず、SQS はメッセージの At Least Once での伝達(FIFO キューの場合は Exactly Once)を実現するために以下の手順でメッセージの購読を行うことを想定しています。

  1. メッセージを SQS からタイムアウト付き(可視性タイムアウト)で 取得
  2. メッセージの内容でタスクを実行
  3. タスクが完了したら SQS からメッセージを 削除
  4. もしタスクの再実行が必要な場合は、タイムアウトによって自動的に復帰するのを待つか、タイムアウト0 に更新して即時復帰させる

タスクの実行が完了した後にメッセージを削除するので、タスク実行中にプロセスが突然終了した場合でもメッセージはタイムアウトの時間が経過後に復活して再実行されます。

もちろん、タスクの実行完了とメッセージの削除の間でプロセスが死ぬ場合もあるので、タスクは冪等に作る必要があります。

さて、Celeryデバッグログを観察する限り以下のような順序で処理をしているようでした。

  1. メッセージを SQS から 取得
  2. メッセージを 削除
  3. タスクを実行
  4. もしタスクの再実行が必要な場合は、新しいメッセージを SQS に送信

メッセージを削除するタイミングが早すぎます。もし、タスクの実行中にプロセスが突然終了した場合にはそのタスクは再実行されることなく、データのロストに繋がる可能性があります。

Celery を SQS で使う限りメッセージの信頼性はない です。

これはあくまでもデバッグログの順序を観察して僕が推測したことなので違うかもしれませんが、確かめようにも内部のコードが読めないので仕方ないです。

Celery は複数の Broker に対応しているため Broker のモデルとしての抽象化が難しく、妥協としてこのような実装になっていると思うので仕方ないとも言えます。が、少なくとも本番環境で使えるような品質ではないので Stable と表記するのはやめていただきたいです。

そのほかのライブラリをざっとディスる

さて、本命の ndkale にたどり着くまでにたくさんの SQS を使った非同期ワーカーライブラリを調査してその内部コードを読んだので、それぞれディスっていきます。もちろんそれぞれいいところはあるのですが、そこまで書くと長くなるので採用しなかった理由だけを書いていきます。

大本命 ndkale

さて、調査の結果 ndkale が実装の品質が高く、手を入れれば本番環境でも使えそうだったので利用しました。

engblog.nextdoor.com

概要はこのブログに記載されていますが、Celery で苦しんだ末に一から非同期ワーカーを実装されたようです。このブログ当時で 100万/day 単位のタスクを分散環境で処理している実績があります。

プレフィックスの nd は作成した会社の NextDoor からきているんだと思います。パッケージ名自体は kale です。2014 年に作られたライブラリで 5 年間運用されているようで、現在でもメンテナンスされているみたいです。

ndkale の最大の特徴は、優先度付きキュー に対応していることです。実現するために Lottery Algorithm を拡張した独自の Reduced Lottery というアルゴリズムでスタベーションなくメッセージを処理します。

もちろん、SQS を正しく使ってデータのロストがおこらないように実装しています(実際にはバグがありましたが、僕が修正 PR を投げました)。再実行についてはリトライ回数をインクリメントしたメッセージを再送信してからメッセージを消すようにしています。SQS のメッセージは Immutable なので再送信をしているようです。SQS が用意している Dead Letter Queue の仕組みは利用せず、独自に再実行回数を判別して Dead Letter Queue へ送信しています。

内部実装を読みましたが、設計と実装の品質がとても高い です。綺麗に書かれているのでスラスラ読めました。設計もよく考えられていて、利用者による機能の拡張がユーザーに露出するコンポーネントである「Task」と「Worker」を継承することで可能なような設計になっています。また、Task と Worker には様々なフックが用意されており、処理の注入もやりやすいです。

一方で、詳しいドキュメントがない という欠点もあります。簡単な使い方と ndkale の概念が README に書かれているだけで、手の込んだ使い方は自分で内部実装を読んで理解するしかありません。しかし、内部実装が品質が高くとてもわかりやすいので、読んでいるだけで 作者の気持ちがわかり、こうやって使えばいいんだなとか、ここを拡張すればいいんだな、みたいなことが読み取れます。読むと感動すると思います。すごい。

あと、仕様上の課題ですが、複数のキューに対応した時に メッセージがあるにも関わらず最大 20 秒間処理が止まってしまう 可能性があるという課題があります。 全てのキューにメッセージがない時にはランダムに選ばれたキューを Long Polling してメッセージを待ちます。しかし、その間に別のキューにメッセージがきても気づくことができず、Long Polling の最大時間である 20 秒間処理されない可能性があります。

この課題は頻繁にメッセージがくるような成熟したサービスであればそもそも Long Polling する時間も短く済むため問題にはなりませんが、全てのキューでメッセージの頻度が少ない場合に問題になります。解決策としては 複数キューでは使わずに単独のキューのみを監視するように使う 方法しかありません。最大の売りである優先度付きキューの機能は無視することになりますが、そのほかの単純なワーカーとしての品質が高いため ndkale を単独キューで利用します。

ndkale に足りないこと

ここまで大絶賛の ndkale ですが、残念ながらこのまま使うには不十分です。ただし、設計と実装の品質が高いため簡単に足りない機能を追加することができます。

単独キューを待って、複数のキューに送信する

Long Polling による別のキューのメッセージ遅延を防ぐためにワーカーは 1 つのキューのみに対してメッセージの監視をします。一方で、タスクの役割ごとに利用するキューは変えたいため、1 つのキューを監視して、複数のキューのうちのどれかに送信するということをしたいです。

残念ながら ndkale では、受信しているキューへ対してしか送信できないため、キューの設定ファイルを 2 つ用意して対応します。

ndkale では、キューの情報は QueueInfo のクラス変数に保持されています。内部実装を見る限り、受信用のキューは QueueInfo._queues から参照し、送信用のキューは QueueInfo._simple_name_queues_map から参照しているので、まず受信用のキューをローディングした後に、送信用のキューを QueueInfo._simple_name_queues_map に追加することで解決します。

from kale import settings, utils, worker
from kale.queue_info import QueueInfo

queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
# load queues for dequeue
QueueInfo(
    config_file=settings.QUEUE_CONFIG,
    sqs_talk=None,
    queue_cls=queue_class
)

# load queues for enqueue
queues = QueueInfo._get_queues_from_config(settings.ENQUEUE_CONFIG, queue_class)
for q in queues:
    if q.simple_name not in QueueInfo._simple_name_queues_map:
        QueueInfo._simple_name_queues_map[q.simple_name] = q

キューの動的な選択

タスクは kale.Task クラスを継承して定義します。送信するキューの名前は継承したタスククラスの queue というクラス変数に設定しますが、これで固定されてしまいます。

タスクの publish() で動的に送信するキューを変更したいのですが、単にクラス変数を上書きするだけでは別プロセスで実行されるワーカーでリトライ時に再送信されるキューがデフォルトのものに戻ってしまうため、うまくいきません。並行処理での問題もあります。

そこで、メッセージが用意している app_data にキューの名前を指定してそれをワーカーにも伝搬させる方針をとることにしました。キューの送信を行う Publisher を差し替えることで実現しています。Publisher の差し替えも Task の継承のオーバーライドでできるように設計されていてすごいです。

from kale import publisher, task

class TaskClass:
    def __init__(self, task_class, module=None, name=None, queue=None, time_limit=None):
        self.__module__ = module or task_class.__module__
        self.__name__ = name or task_class.__name__
        self.queue = queue or task_class.queue
        self.time_limit = time_limit or task_class.time_limit

class Publisher(publisher.Publisher):
    def publish(self, task_class, task_id, payload, current_retry_num=None, delay_sec=None):
        """
        Override original method to inject TaskClass
        """
        app_data = payload.get('app_data')
        if isinstance(app_data, dict):
            task_class = TaskClass(task_class, queue=app_data.get('queue'))
        super(Publisher, self).publish(task_class, task_id, payload, current_retry_num=current_retry_num,
                                       delay_sec=delay_sec)

task_publisher = None

class BaseTask(task.Task):
    @staticmethod
    def _get_publisher():
        """
        Override original class method to inject custom Publisher
        """
        global task_publisher
        if task_publisher is None:
            task_publisher = Publisher()
        return task_publisher


class SomeTask(BaseTask):
    pass

# dynamic queue selection for enqueue
SomeTask.publish({'queue': 'not_default_queue'}, msg)

再実行時刻を指定するリトライエラー

タスク実行時にエラーが発生した場合は Task._get_delay_sec_for_retry(current_retry_num) で計算される秒数待って再実行するようにして、exponential backoff に対応しています。

しかし、エラーに設定した時刻に再実行されるほうが扱いやすいので、retry_at を設定したエラーを送出することでその時刻に再実行するように DelaySeconds を設定して欲しいです。

残念ながら、ndkale はエラーの種類による動的なリトライ時間の計算は想定していなかったようで、ndkale 本体に手を入れなければこれは実現できません。

# kale.task.Task
    @classmethod
    def _get_delay_sec_for_retry_with_message(cls, message, raised_exception):
        return cls._get_delay_sec_for_retry_with_message(message.task_retry_num)

    def handle_failure(cls, message, raised_exception):
        # delay_sec = cls._get_delay_sec_for_retry(message.task_retry_num)
        delay_sec = cls._get_delay_sec_for_retry_with_message(message, raised_exception)

https://github.com/aces-inc/ndkale/commit/4787a8a0218fe575621bccc7752635b291e61c39#diff-9230566b060f8c0681ff4527bc9e483aR174

ndkale をこのように修正した上で以下のように Task を拡張して _get_delay_sec_for_retry_with_message() 関数をオーバーライドすればリトライ時刻の指定が可能になります。

class RetryError(Exception):
    def __init__(self, msg, retry_at=None):
        """
        retry_at must be set tzinfo
        """
        self.msg = msg
        self.retry_at = retry_at

class BaseTask(task.Task):
    @classmethod
    def _get_delay_sec_for_retry_with_message(cls, message, raised_exception):
        if isinstance(raised_exception, RetryError) and raised_exception.retry_at is not None:
            now = timezone.now()
            if raised_exception.retry_at < now:
                # retry immediately
                return 0
            diff = raised_exception.retry_at - now
            if diff.seconds > 900:
                # SQS max delay_time is 15 minutes (900 seconds)
                return 900
            else:
                seconds = diff.seconds
                if diff.microseconds > 0:
                    seconds += 1
                return seconds

        return super(BaseTask, cls)._get_delay_sec_for_retry_with_message(message, raised_exception)

retry_at0 の場合は即時再実行、None の場合は従来の exponential backoff でリトライします。また、SQS の DelaySeconds の最大値は 900 秒であるため、それより長い場合には 900 秒に切り詰めます。また秒以下の値は切り上げになります。

Django で使う

Django と組み合わせて使うためには、Django のコンテキストをうまく初期化して使う必要があります。Django の初期化をしつつ Worker を単独で起動する方法として、manage.py でコマンドとして起動する方法を採用します。

また、タスク内で Django のデータベースへのアクセスをする場合は、コネクションプールにあるデータベースへのコネクションを気にする必要があります。コネクションは、データベースに設定されたアイドル時間によるタイムアウトや大きすぎるサイズのクエリなどによって強制的に切断されてしまう場合があります。

これらの切断されたコネクションをそのまま使うと以下のようなエラーが発生します。

(2006, 'MySQL server has gone away')

これを解決するためには切断されたコネクションをコネクションプールから排除する必要がありますが、Django は自動でコネクションプールの死活管理をしてくれません。そのため、タスクの実行後に close_old_connections() を呼び出してコネクションプール内の切断されたコネクションを排除します。

Django のコマンドとして呼び出すために、<package_name>/management/commands/worker.py に以下のようなコマンドを定義します。

from __future__ import absolute_import, print_function

import logging

from django.core.management.base import BaseCommand
from django.db import close_old_connections
from kale import settings, utils, worker
from kale.queue_info import QueueInfo

logger = logging.getLogger(__name__)

class DjangoWorker(worker.Worker):
    def _on_task_succeeded(self, message, time_remaining_sec):
        super()._on_task_succeeded(message, time_remaining_sec)
        # cleanup db connection pool for each task
        close_old_connections()

    def _on_task_failed(self, message, time_remaining_sec, err, permanent_failure):
        super()._on_task_failed(message, time_remaining_sec, err, permanent_failure)
        # cleanup db connection pool for each task
        close_old_connections()

class Command(BaseCommand):
    help = 'worker'

    def handle(self, *args, **options):
        queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
        # load queues for dequeue
        QueueInfo(
            config_file=settings.QUEUE_CONFIG,
            sqs_talk=None,
            queue_cls=queue_class
        )

        # load queues for enqueue
        queues = QueueInfo._get_queues_from_config(settings.ENQUEUE_CONFIG, queue_class)
        for q in queues:
            if q.simple_name not in QueueInfo._simple_name_queues_map:
                QueueInfo._simple_name_queues_map[q.simple_name] = q

        logger.info('Task worker is running ...')
        DjangoWorker().run()

python manage.py worker と実行すればワーカーが起動されます。

テストで即時実行したい

非同期処理をテストすることはいつそのタスクが実行されるかを制御する必要があって難しいです。そのため、テストの時だけ非同期処理を同期的に実行したくなります。

残念ながら ndkale にはテスト時にだけ同期的に処理する仕組みはないですが、以下のように publish_now() クラスメソッドを Task に追加することで実現できます。

class BaseTask(task.Task):
    @classmethod
    def publish_now(cls, app_data, *args, **kwargs):
        tsk = cls({})
        tsk.run(*args, **kwargs)

テスト時には、publish() クラスメソッドを publish_now() に差し替えます。

class SampleTask(BaseTask):
    pass

from unittest import mock, TestCase

class SampleTests(TestCase):
    @mock.patch('SampleTask.publish', SampleTask.publish_now)
    def test_sample(self):
        SampleTask.publish()

バグを直す

ndkale の内部実装を眺めているうちにいくつかのバグに気づいたので本家のリポジトリにプルリクエストを送りました。いずれもいいレスポンスをもらえたのでそのうちマージされると思います。

github.com

メッセージの削除の順序が間違っていてデータロストする可能性があった課題を解決したプルリクエストです。

github.com

タスクの即時実行ができなくなってしまっている課題を解決したプルリクエストです。後方互換性のためにもう少し工夫する必要があるみたいですが、そのうち直します。

最後に

さて、かなり長くなってしまいましたが ndkale を紹介しました。

とても品質の高いプロダクトなので内部実装を読んで勉強してみるのもいいと思います。あと、Celery を SQS で使うのはやめることをおすすめします。