kawasin73のブログ

技術記事とかいろんなことをかくブログです

SQS を使った Python の非同期ワーカーは ndkale しかない

誰一人見捨てない!!! どうも、かわしんです。Celery は見捨てるんです。

この記事は Pythonその2 Advent Calendar 2019 の 15 日目の記事です。

やや強めのタイトルですが、AWS SQS を使った非同期ワーカーでまともな実装は ndkale しかないという内容です。Celery は論外です。

github.com

前半はディスってばっかりなので、ndkale のことだけを知りたい場合は途中の「大本命 ndkale」から読んでください

前提としての欲しい機能

まず、諸々をディスる前に非同期ワーカーとして欲しい機能をあげておきます。

  • 正しく SQS を使って信頼性のあるタスク実行をする
  • 即時再実行をする
  • 複数のキューを使い分ける。また同じタスクでも動的に利用するキューを切り替えたい
  • Dead Letter Queue も使えると嬉しい

まず Celeryディスる

Python の非同期処理基盤として有名な OSS プロダクトとして Celery があります。

それなりにたくさんの人が使っていて、良さげな雰囲気があります。

Celery では Broker のうち RabbitMQRedisSQS の 3 つを Stable としてサポートしています。

Brokers — Celery 4.3.0 documentation

そこで SQS を Celery で使って試していました。

Celery のダメなところ

当たり前ですが、OSS を使うときはその中身のコードをざっと読んで本番環境で使える品質のものなのかを確かめます。

しかし、Celery は内部のコードが全く読めません 。どこが起動時のエンドポイントなのか、どこで Broker を切り分けているのか、コアとなる処理はどこに書いてあるのか、コードが難解すぎて全くわかりません。いざエントリーポイントを見つけてコードを追っていってもあっという間に呼び出す関数を見失ってしまいます。

ただ、たくさんの人が使っている信頼性があると思われるプロダクトだし、中のコードは読めないけど色々なケースを考慮して作られていることは読み取れたので Celery を使っていました。

Celery + SQS の致命的にダメなところ

Celeryデバッグログをだしながら使っていて気づいたことなのですが、Celery はプロセスの強制終了でメッセージをロスト します。これは信頼性のあるメッセージ伝達を行う SQS を使うワーカーとしては致命的です。もはや本番環境では使えない品質です。

まず、SQS はメッセージの At Least Once での伝達(FIFO キューの場合は Exactly Once)を実現するために以下の手順でメッセージの購読を行うことを想定しています。

  1. メッセージを SQS からタイムアウト付き(可視性タイムアウト)で 取得
  2. メッセージの内容でタスクを実行
  3. タスクが完了したら SQS からメッセージを 削除
  4. もしタスクの再実行が必要な場合は、タイムアウトによって自動的に復帰するのを待つか、タイムアウト0 に更新して即時復帰させる

タスクの実行が完了した後にメッセージを削除するので、タスク実行中にプロセスが突然終了した場合でもメッセージはタイムアウトの時間が経過後に復活して再実行されます。

もちろん、タスクの実行完了とメッセージの削除の間でプロセスが死ぬ場合もあるので、タスクは冪等に作る必要があります。

さて、Celeryデバッグログを観察する限り以下のような順序で処理をしているようでした。

  1. メッセージを SQS から 取得
  2. メッセージを 削除
  3. タスクを実行
  4. もしタスクの再実行が必要な場合は、新しいメッセージを SQS に送信

メッセージを削除するタイミングが早すぎます。もし、タスクの実行中にプロセスが突然終了した場合にはそのタスクは再実行されることなく、データのロストに繋がる可能性があります。

Celery を SQS で使う限りメッセージの信頼性はない です。

これはあくまでもデバッグログの順序を観察して僕が推測したことなので違うかもしれませんが、確かめようにも内部のコードが読めないので仕方ないです。

Celery は複数の Broker に対応しているため Broker のモデルとしての抽象化が難しく、妥協としてこのような実装になっていると思うので仕方ないとも言えます。が、少なくとも本番環境で使えるような品質ではないので Stable と表記するのはやめていただきたいです。

そのほかのライブラリをざっとディスる

さて、本命の ndkale にたどり着くまでにたくさんの SQS を使った非同期ワーカーライブラリを調査してその内部コードを読んだので、それぞれディスっていきます。もちろんそれぞれいいところはあるのですが、そこまで書くと長くなるので採用しなかった理由だけを書いていきます。

大本命 ndkale

さて、調査の結果 ndkale が実装の品質が高く、手を入れれば本番環境でも使えそうだったので利用しました。

engblog.nextdoor.com

概要はこのブログに記載されていますが、Celery で苦しんだ末に一から非同期ワーカーを実装されたようです。このブログ当時で 100万/day 単位のタスクを分散環境で処理している実績があります。

プレフィックスの nd は作成した会社の NextDoor からきているんだと思います。パッケージ名自体は kale です。2014 年に作られたライブラリで 5 年間運用されているようで、現在でもメンテナンスされているみたいです。

ndkale の最大の特徴は、優先度付きキュー に対応していることです。実現するために Lottery Algorithm を拡張した独自の Reduced Lottery というアルゴリズムでスタベーションなくメッセージを処理します。

もちろん、SQS を正しく使ってデータのロストがおこらないように実装しています(実際にはバグがありましたが、僕が修正 PR を投げました)。再実行についてはリトライ回数をインクリメントしたメッセージを再送信してからメッセージを消すようにしています。SQS のメッセージは Immutable なので再送信をしているようです。SQS が用意している Dead Letter Queue の仕組みは利用せず、独自に再実行回数を判別して Dead Letter Queue へ送信しています。

内部実装を読みましたが、設計と実装の品質がとても高い です。綺麗に書かれているのでスラスラ読めました。設計もよく考えられていて、利用者による機能の拡張がユーザーに露出するコンポーネントである「Task」と「Worker」を継承することで可能なような設計になっています。また、Task と Worker には様々なフックが用意されており、処理の注入もやりやすいです。

一方で、詳しいドキュメントがない という欠点もあります。簡単な使い方と ndkale の概念が README に書かれているだけで、手の込んだ使い方は自分で内部実装を読んで理解するしかありません。しかし、内部実装が品質が高くとてもわかりやすいので、読んでいるだけで 作者の気持ちがわかり、こうやって使えばいいんだなとか、ここを拡張すればいいんだな、みたいなことが読み取れます。読むと感動すると思います。すごい。

あと、仕様上の課題ですが、複数のキューに対応した時に メッセージがあるにも関わらず最大 20 秒間処理が止まってしまう 可能性があるという課題があります。 全てのキューにメッセージがない時にはランダムに選ばれたキューを Long Polling してメッセージを待ちます。しかし、その間に別のキューにメッセージがきても気づくことができず、Long Polling の最大時間である 20 秒間処理されない可能性があります。

この課題は頻繁にメッセージがくるような成熟したサービスであればそもそも Long Polling する時間も短く済むため問題にはなりませんが、全てのキューでメッセージの頻度が少ない場合に問題になります。解決策としては 複数キューでは使わずに単独のキューのみを監視するように使う 方法しかありません。最大の売りである優先度付きキューの機能は無視することになりますが、そのほかの単純なワーカーとしての品質が高いため ndkale を単独キューで利用します。

ndkale に足りないこと

ここまで大絶賛の ndkale ですが、残念ながらこのまま使うには不十分です。ただし、設計と実装の品質が高いため簡単に足りない機能を追加することができます。

単独キューを待って、複数のキューに送信する

Long Polling による別のキューのメッセージ遅延を防ぐためにワーカーは 1 つのキューのみに対してメッセージの監視をします。一方で、タスクの役割ごとに利用するキューは変えたいため、1 つのキューを監視して、複数のキューのうちのどれかに送信するということをしたいです。

残念ながら ndkale では、受信しているキューへ対してしか送信できないため、キューの設定ファイルを 2 つ用意して対応します。

ndkale では、キューの情報は QueueInfo のクラス変数に保持されています。内部実装を見る限り、受信用のキューは QueueInfo._queues から参照し、送信用のキューは QueueInfo._simple_name_queues_map から参照しているので、まず受信用のキューをローディングした後に、送信用のキューを QueueInfo._simple_name_queues_map に追加することで解決します。

from kale import settings, utils, worker
from kale.queue_info import QueueInfo

queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
# load queues for dequeue
QueueInfo(
    config_file=settings.QUEUE_CONFIG,
    sqs_talk=None,
    queue_cls=queue_class
)

# load queues for enqueue
queues = QueueInfo._get_queues_from_config(settings.ENQUEUE_CONFIG, queue_class)
for q in queues:
    if q.simple_name not in QueueInfo._simple_name_queues_map:
        QueueInfo._simple_name_queues_map[q.simple_name] = q

キューの動的な選択

タスクは kale.Task クラスを継承して定義します。送信するキューの名前は継承したタスククラスの queue というクラス変数に設定しますが、これで固定されてしまいます。

タスクの publish() で動的に送信するキューを変更したいのですが、単にクラス変数を上書きするだけでは別プロセスで実行されるワーカーでリトライ時に再送信されるキューがデフォルトのものに戻ってしまうため、うまくいきません。並行処理での問題もあります。

そこで、メッセージが用意している app_data にキューの名前を指定してそれをワーカーにも伝搬させる方針をとることにしました。キューの送信を行う Publisher を差し替えることで実現しています。Publisher の差し替えも Task の継承のオーバーライドでできるように設計されていてすごいです。

from kale import publisher, task

class TaskClass:
    def __init__(self, task_class, module=None, name=None, queue=None, time_limit=None):
        self.__module__ = module or task_class.__module__
        self.__name__ = name or task_class.__name__
        self.queue = queue or task_class.queue
        self.time_limit = time_limit or task_class.time_limit

class Publisher(publisher.Publisher):
    def publish(self, task_class, task_id, payload, current_retry_num=None, delay_sec=None):
        """
        Override original method to inject TaskClass
        """
        app_data = payload.get('app_data')
        if isinstance(app_data, dict):
            task_class = TaskClass(task_class, queue=app_data.get('queue'))
        super(Publisher, self).publish(task_class, task_id, payload, current_retry_num=current_retry_num,
                                       delay_sec=delay_sec)

task_publisher = None

class BaseTask(task.Task):
    @staticmethod
    def _get_publisher():
        """
        Override original class method to inject custom Publisher
        """
        global task_publisher
        if task_publisher is None:
            task_publisher = Publisher()
        return task_publisher


class SomeTask(BaseTask):
    pass

# dynamic queue selection for enqueue
SomeTask.publish({'queue': 'not_default_queue'}, msg)

再実行時刻を指定するリトライエラー

タスク実行時にエラーが発生した場合は Task._get_delay_sec_for_retry(current_retry_num) で計算される秒数待って再実行するようにして、exponential backoff に対応しています。

しかし、エラーに設定した時刻に再実行されるほうが扱いやすいので、retry_at を設定したエラーを送出することでその時刻に再実行するように DelaySeconds を設定して欲しいです。

残念ながら、ndkale はエラーの種類による動的なリトライ時間の計算は想定していなかったようで、ndkale 本体に手を入れなければこれは実現できません。

# kale.task.Task
    @classmethod
    def _get_delay_sec_for_retry_with_message(cls, message, raised_exception):
        return cls._get_delay_sec_for_retry_with_message(message.task_retry_num)

    def handle_failure(cls, message, raised_exception):
        # delay_sec = cls._get_delay_sec_for_retry(message.task_retry_num)
        delay_sec = cls._get_delay_sec_for_retry_with_message(message, raised_exception)

https://github.com/aces-inc/ndkale/commit/4787a8a0218fe575621bccc7752635b291e61c39#diff-9230566b060f8c0681ff4527bc9e483aR174

ndkale をこのように修正した上で以下のように Task を拡張して _get_delay_sec_for_retry_with_message() 関数をオーバーライドすればリトライ時刻の指定が可能になります。

class RetryError(Exception):
    def __init__(self, msg, retry_at=None):
        """
        retry_at must be set tzinfo
        """
        self.msg = msg
        self.retry_at = retry_at

class BaseTask(task.Task):
    @classmethod
    def _get_delay_sec_for_retry_with_message(cls, message, raised_exception):
        if isinstance(raised_exception, RetryError) and raised_exception.retry_at is not None:
            now = timezone.now()
            if raised_exception.retry_at < now:
                # retry immediately
                return 0
            diff = raised_exception.retry_at - now
            if diff.seconds > 900:
                # SQS max delay_time is 15 minutes (900 seconds)
                return 900
            else:
                seconds = diff.seconds
                if diff.microseconds > 0:
                    seconds += 1
                return seconds

        return super(BaseTask, cls)._get_delay_sec_for_retry_with_message(message, raised_exception)

retry_at0 の場合は即時再実行、None の場合は従来の exponential backoff でリトライします。また、SQS の DelaySeconds の最大値は 900 秒であるため、それより長い場合には 900 秒に切り詰めます。また秒以下の値は切り上げになります。

Django で使う

Django と組み合わせて使うためには、Django のコンテキストをうまく初期化して使う必要があります。Django の初期化をしつつ Worker を単独で起動する方法として、manage.py でコマンドとして起動する方法を採用します。

また、タスク内で Django のデータベースへのアクセスをする場合は、コネクションプールにあるデータベースへのコネクションを気にする必要があります。コネクションは、データベースに設定されたアイドル時間によるタイムアウトや大きすぎるサイズのクエリなどによって強制的に切断されてしまう場合があります。

これらの切断されたコネクションをそのまま使うと以下のようなエラーが発生します。

(2006, 'MySQL server has gone away')

これを解決するためには切断されたコネクションをコネクションプールから排除する必要がありますが、Django は自動でコネクションプールの死活管理をしてくれません。そのため、タスクの実行後に close_old_connections() を呼び出してコネクションプール内の切断されたコネクションを排除します。

Django のコマンドとして呼び出すために、<package_name>/management/commands/worker.py に以下のようなコマンドを定義します。

from __future__ import absolute_import, print_function

import logging

from django.core.management.base import BaseCommand
from django.db import close_old_connections
from kale import settings, utils, worker
from kale.queue_info import QueueInfo

logger = logging.getLogger(__name__)

class DjangoWorker(worker.Worker):
    def _on_task_succeeded(self, message, time_remaining_sec):
        super()._on_task_succeeded(message, time_remaining_sec)
        # cleanup db connection pool for each task
        close_old_connections()

    def _on_task_failed(self, message, time_remaining_sec, err, permanent_failure):
        super()._on_task_failed(message, time_remaining_sec, err, permanent_failure)
        # cleanup db connection pool for each task
        close_old_connections()

class Command(BaseCommand):
    help = 'worker'

    def handle(self, *args, **options):
        queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
        # load queues for dequeue
        QueueInfo(
            config_file=settings.QUEUE_CONFIG,
            sqs_talk=None,
            queue_cls=queue_class
        )

        # load queues for enqueue
        queues = QueueInfo._get_queues_from_config(settings.ENQUEUE_CONFIG, queue_class)
        for q in queues:
            if q.simple_name not in QueueInfo._simple_name_queues_map:
                QueueInfo._simple_name_queues_map[q.simple_name] = q

        logger.info('Task worker is running ...')
        DjangoWorker().run()

python manage.py worker と実行すればワーカーが起動されます。

テストで即時実行したい

非同期処理をテストすることはいつそのタスクが実行されるかを制御する必要があって難しいです。そのため、テストの時だけ非同期処理を同期的に実行したくなります。

残念ながら ndkale にはテスト時にだけ同期的に処理する仕組みはないですが、以下のように publish_now() クラスメソッドを Task に追加することで実現できます。

class BaseTask(task.Task):
    @classmethod
    def publish_now(cls, app_data, *args, **kwargs):
        tsk = cls({})
        tsk.run(*args, **kwargs)

テスト時には、publish() クラスメソッドを publish_now() に差し替えます。

class SampleTask(BaseTask):
    pass

from unittest import mock, TestCase

class SampleTests(TestCase):
    @mock.patch('SampleTask.publish', SampleTask.publish_now)
    def test_sample(self):
        SampleTask.publish()

バグを直す

ndkale の内部実装を眺めているうちにいくつかのバグに気づいたので本家のリポジトリにプルリクエストを送りました。いずれもいいレスポンスをもらえたのでそのうちマージされると思います。

github.com

メッセージの削除の順序が間違っていてデータロストする可能性があった課題を解決したプルリクエストです。

github.com

タスクの即時実行ができなくなってしまっている課題を解決したプルリクエストです。後方互換性のためにもう少し工夫する必要があるみたいですが、そのうち直します。

最後に

さて、かなり長くなってしまいましたが ndkale を紹介しました。

とても品質の高いプロダクトなので内部実装を読んで勉強してみるのもいいと思います。あと、Celery を SQS で使うのはやめることをおすすめします。

golang.tokyo #28 で登壇して賞品をいただきました

限りある時間を大切に。どうも、かわしんです。情報洪水の現世をスマートに生きていきましょう。

さて、去る 12 月 4 日に開催された 「golang.tokyo #28 ~年末だよ!Go大忘年会&LT大会!~」 で LT をしてきました。

golangtokyo.connpass.com

発表資料はこれです。

半年前に作った Twilter の話で、なぜ作ったのかと工夫したフィルターの仕組みについて解説しました。

内容としては以前の記事の内容を濃縮した感じです。

kawasin73.hatenablog.com

この LT 大会では最後に参加者全員で投票してよかった LT を決める時間があり、その中で第4位に選んでいただきました。

1位から賞品を選んでいったのですが、僕が一番欲しかった「Go 言語による並行処理」のオライリー本をいただくことができました。ありがとうございます。

f:id:kawasin73:20191214161253j:plain

最後に発表中の反応ツイートを引用して筆をおきたいと思います。

Go でトランザクションをフルスクラッチで実装した

一歩ずつ一歩ずつ前へ進んでいく、確実に。どうも、かわしんです。

到底 1 記事に収まるような内容ではなく長いので、トランザクションの作り方に興味のない方は途中の「なぜ Go なのか」まで読んでいただければ嬉しいです。

この記事は、Go2 Advent Calendar 2019 の 7 日目と セキュリティキャンプ 修了生進捗 #seccamp OB/OG Advent Calendar 2019 の 7 日目を兼用しています。

さて、僕の興味は必要になったライブラリやミドルウェアなどを自作して、作りたいプロダクトを完成させることです。必要なコンポーネントがないからといってプロダクトを作るのを諦めたり妥協したりはしたくありません。

多くのアプリケーションではデータベースは重要なコンポーネントです。大抵のアプリケーションは MySQL や Postgres、Redis など既存のデータベースシステムで対応することができますが、既存のデータベースシステムでは対応できないような場合に備えてデータベースシステムを作れる力をつけておきたいと思っていました。

そこで、今回はデータベースシステムを作る前哨戦として トランザクション を実装します。出来上がったものはこちらです。

github.com

僕は、今年の セキュリティ・ネクストキャンプ に参加したのですが、同時開催されていたセキュリティキャンプのデータベースゼミの講師である 星野さん にお願いして講座で利用された教科書をいただき、キャンプが終わってからその教科書を参考に設計と実装を行いました。

どんなトランザクションを作るのか

トランザクションとは ACID を満たして複数のデータの書き込みや読み込みをできるようにする仕組みだと僕は認識しています。

ACID は、A (Atomicity : 不可分性)、C (Consistency : 一貫性) 、I (Isolation : 独立性)、D (Durability : 永続性) ですね。

ここでは詳しくは説明しないので、以下の資料を読むといいかと思います。

qiita.com

マジモンのトランザクションをガチ実装するのは初心者の僕には大変なので、実装しやすいように色々制約をつけたトランザクションを作ります。

  • KVS (Key Value Store) にする
    • シンプルな操作 INSERT UPDATE READ DELETE COMMIT ABORT の操作だけに対応します。SQL などはありません。
  • 範囲検索はしない
    • インデックスの実装をサボるために範囲検索はスコープ外としています。(あと、S2PL でも助けられます)
  • インメモリデータベースとする
    • 全てのデータはメモリに載る前提です。それより大きなデータには対応しません。
    • Buffer Pool などをサボることができます。(あと、undo ログをサポートしなくてもよくなります)
  • non-deterministic DBMS を想定します
    • deterministic DBMS とはあらかじめトランザクションの処理内容がわかっているものです。例えばストアドプロシージャとかでしょうか。
    • 逆に non-deterministic ではトランザクション内で次にどの処理命令がくるかがわかりません。いつ COMMIT されるのか ABORT されるのかもわかりません。普通はこちらが使われてると思います。
    • non-deterministic DBMS ではトランザクションは ACID の I を担保するために頑張る必要があります。
  • WAL を使う
    • Write Ahead Log の略です。データベースに対する何らかの更新(Write)を行う前にその操作の内容をログとして追記し、更新が反映する途中でプロセスが死んだ場合でもその更新を再開したりロールバックできるようにして、ACID の A と D を実現するための仕組みです。
    • ログは COMMIT ログが永続化された変更は永続化されたとして確定し、プロセスが死んで再開された後に復旧されます(Crash Recovery)。
    • COMMIT ログが永続化される前にプロセスが死んだ場合は WAL ログに書かれた更新の内容は失敗したとして破棄されたり書き戻されたりします。
    • つまり、WAL に COMMIT ログが書かれて初めてクライアントにトランザクションの更新が成功したと返信することができます。
    • 参考 : バッファ管理とWAL - Qiita
    • ログには undo ログと redo ログの 2 種類がありますが、今回は redo ログのみを使います。
  • チェックポイントの作成は起動時と終了時のみ
    • トランザクションが動いているときにチェックポイントを書き出すのはややこしいので今回はサボります。
    • プロセスの終了が遅くなってしまいますが許容します。

このような制約がありつつも、マルチスレッドで ACID を守って正しく動くトランザクションを作ります。

どうやってトランザクションを作るのか

何を作る時でも同じですが、「まず遅くてもいいから正しく動くものを愚直に作り、それに最適化を施すことによって効率的に速く動くようにしていく」 という作り方が鉄則です。

今回は以下の3ステップに分けて実装していきます。

Step 1 は、Trivial Scheduler と呼ぶそうです。同時に1つのトランザクションしか動かないので排他制御も必要ありません。ただ、愚直に実装するのみです。ただし、今後はこの実装に積み重ねていくのでインターフェイスなどはちゃんと考えて、正しく振る舞うように実装します。

Step 2 では、複数のトランザクションが同時に動くので 並行制御 をする必要があります。並行制御アルゴリズムには様々なものがありますが、初心者向けとして S2PL (Strict Two Phase Lock) が教科書で勧められていたのでこれを採用します。

Step 3 では、Step 2 の実装をマルチスレッドで動くようにします。インデックスやデータストアや WAL などは同時にアクセスによるデータ競合を防ぐために 排他制御 をする必要が出てきます。

なぜ Go なのか

今回は Go で実装していくわけですが、その一番大きな理由は goroutine の存在です。

Step 2 の並行制御をするためには並行スケジューラが必要になります。しかし、goroutine は I/O と自動で連携したコルーチンであり、Go のランタイムに並行スケジューラが実装されているため、Go を使うことでこのトランザクションの中で一番大変と思われる 並行スケジューラの実装をサボる ことができるのです。

むしろ、並行スケジューラを実装しないでトランザクションの実装を勉強したと言えるのかどうか怪しいところではありますが、それは C や C++ で実装するときにやるので勘弁して欲しいです。今回はトランザクションの概観をざっくり知るために作っているので。

また、Step 3 でマルチスレッド化するわけですが、goroutine ベースで作るとランタイムが 自動でスレッド間で適切に分散してスケジューリング してくれます。正直あっけないくらい簡単にマルチスレッド対応できてしまいました。

また、Go はミドルウェアなどを プロトタイピングするのに向いている言語 だと思っています。

goroutine もそうですが、map やスライスなど必要なコンポーネントが言語自体や標準ライブラリに含まれていますし、GC がついているので雑にメモリを確保することができます。また、バイト列が []byte として柔軟に扱える上に、多くのインターフェイス[]byte を利用することが想定されている (io.Reader など) ため、ファイル操作やバイナリ処理をするようなミドルウェアの開発に向いています。さらに、そこそこ速いため速さを求められないようなプロダクトではそのままプロダクションで使えてしまいます。

トランザクションを作っていく!

ここでは順を追ってどうやって実装していったかを紹介していきます。全て main.go の1ファイルに収まる 1000 行に満たない小さな実装なので、ぜひコードを読んでみてください。それぞれのステップでブランチがきられています。

Step 1 : シングルスレッドで同時に1トランザクションのみが動く

GitHub - kawasin73/txngo at trivial_scheduler

まず Go で シングルスレッド を実現しなくてはなりません。Go は M:N 方式 (1, 2) で動くため厳密にシングルスレッドでは動きません。しかし、goroutine を実行する主体である processor を 1 つだけに限定すればシングルスレッドと同じように 1 つの goroutine のみが同時に動き、データ競合も発生しません。

// execute on single thread
runtime.GOMAXPROCS(1)

トランザクションは同時に1つしか動かないため入出力インターフェイスも1つだけで十分です。そこで、標準入出力 を使ってターミナルから柔軟にクエリを流し込めるようにしました。

さて、トランザクションは以下のような構成で実現しています。

f:id:kawasin73:20191206180322p:plain

起動時に CheckPoint ファイルから永続化されている 全てのデータが DB に読み込まれ ます。DB は map[string]Record で表されたハッシュマップで、キーからレコードを高速に引いてくることができ、インデックスの役割も兼ねています。

トランザクションの更新した変更は途中で ABORT されて変更が全て破棄される可能性があります。もし COMMIT される前に更新内容を DB に反映していた場合はその内容を元に戻す必要があります。さらに、今後複数トランザクションを同時に動かす時には、MVCC などを導入しないと確定していないデータを他のトランザクションに読まれてしまうダーティリードになってしまいます。

この課題を解決するために トランザクションの更新した変更は Write Set に保存して、COMMIT された後に DB に書き戻す ようにします。

更新処理の中で以下のように key を処理しているのを不思議に思うかもしれません。これは引数で渡された文字列 key が別の文字列の一部であった時、key が DB に保存されることで元の文字列が GC されなくなってしまうことを防ぐためです。明示的に key 文字列のメモリを確保し直して、外部のメモリへの参照を発生させないようにしています。

// reallocate string
key = string(key)

READ ではトランザクション内での変更結果を読む必要があるため、まず Write Set を検査して、なかった場合に初めて DB を読みにいくようにします。READ での検索を高速にするために Write Set は map[string]RecordCache で表されたハッシュマップで実現します。レコードが削除された場合でも DB にはレコードはあるので Write Set に削除されたフラグを持った RecordCache を保持します。

COMMIT 時にトランザクションでの 更新履歴を全てログとして一度に WAL に書き出し ます。本当は更新時に逐一 WAL に書き出して COMMIT 時には COMMIT ログだけを永続化することが望ましいのですが、次のステップで複数のトランザクションから同時に WAL に更新ログを書き込むとログの順序が割り込まれてしまうため、一度に書き出すようにしました。(トランザクション ID を使えば解決しますが、ID の払い出しを考えたくなかったのでこの方法にしています。また、ABORT される内容を WAL に書かないのでちょっと効率的かなとか思ってます)

また、Write Set のハッシュマップでは更新した順番がわからないため、ログの履歴を保持するスライス Logs ([]RecordLog) を用意し、その内容を WAL に書き出します。

ABORT するときは、Write Set の中身と Logs をクリアするだけでトランザクションの変更内容は全て破棄されます。

プロセスの終了時に DB に保存された内容を全て CheckPoint ファイルに書き出します。しかし、ファイルの上書き中にプロセスが突然死すると元々の CheckPoint ファイルの内容が壊れてしまいます。そこで一時ファイルに内容を書き出した後に os.Rename() してアトミックにファイルを差し替えて CheckPoint ファイルを更新します。上書きされたファイルの内容は自動で削除されます。

CheckPoint ファイルへの書き出しが永続化されたら WAL ファイルの内容は全て必要なくなります。(WAL の中の全ての更新履歴が CheckPoint ファイルに反映されているため)そのため、WAL ファイルを os.Truncate() してクリアします。

起動時に WAL ファイルの中身があるときは前回のプロセスが異常終了したことを表します。(正常終了では必ず WAL ファイルはクリアされているため)そのときは Crash Recovery を行います。WAL ファイルの更新履歴を COMMIT ログがあるところまで再実行して CheckPoint ファイルの内容を更新します。


やっていることはこれだけです。これだけの処理をするだけで ACID を守ったトランザクションが出来上がってしまいます。

Step 2 : シングルスレッドで同時に複数のトランザクションが並行に動く

GitHub - kawasin73/txngo at S2PL-single-thread

まず、複数のトランザクションに同時にクエリを送信するために、インターフェイスは標準入出力の他に TCP コネクション をサポートするようにしました。telnet で接続すると標準入出力と同じやり取りを複数同時に行えるようになります。TCP コネクションごとにトランザクション実行主体が作成されます。

複数のトランザクションを並行に動かすために並行制御アルゴリズムである S2PL を導入します。トランザクションはそれぞれ goroutine のなかで並行に動いています。

S2PL とは Two Phase Lock を発展させたもので、あるレコードへの排他ロック(Write Lock)または共有ロック(Read Lock)の獲得を成長相のみで行い、トランザクションの完了(WAL への COMMIT ログの書き込み)後に排他ロックを解放するというものです。共有ロックはトランザクションの完了前に解放することが許されますが、排他ロック・共有ロックのどちらのロックも解放したあとは縮退相となりロックの獲得はできなくなります。

この S2PL を導入するためにそれぞれの操作に以下の処理を追加します。

  • READ するときは Read Lock を獲得
  • INSERT, UPDATE, DELETE をするときは Write Lock を獲得
  • COMMIT するときは以下の順番で処理
    1. 獲得した全ての Read Lock を解放
    2. WAL の書き込みと永続化
    3. Write Set の中身を DB に反映
    4. 獲得した全ての Write Lock を解放
    5. Write Set と Logs をクリア
  • ABORT するときは獲得した全ての Lock を解放

ここで1つ課題が発生します。READ したレコードを更新する場合 の処理は自明ではなく、明確に決める必要があります。

すでに Write Lock を獲得した後は READ をする場合でも更新処理をする場合でも新たにロックを取ることはありません。しかし、Read Lock を獲得した後に更新する場合はそれを Write Lock に変換する必要があります。

Go には、sync.RWMutex という Read Lock と Write Lock が可能な Mutex コンポーネントがありますが、残念ながら Read Lock を Write Lock に変換する方法は提供していません。もし、Read Lock を解放して Write Lock を獲得し直すと別の goroutine で Write Lock 待ちしていたトランザクションにロックを横取りされてしまう可能性があります。

ここで僕が考えた解決策は以下の2つです。

前者は、標準ライブラリの sync.RWMutex だけで実現できるため簡単に実装できます。しかし、ユーザはトランザクション開始時に明示的に Read-Write か Read-Only であるかを宣言する必要があります。正直これは使い勝手が悪いです。そのくらい自動で判別してほしい。

処理パフォーマンスの高速化やデッドロックの回避を理由にしてこの手法を採用するのであればいいのですが、コンポーネント自作が難しいからという理由でユーザに不自由を押し付けることはしたくありません

そこで後者の昇格可能な RWMutex を自作する手法を選択して github.com/kawasin73/umutex を自作しました。詳しくは以下の記事で紹介しています。

kawasin73.hatenablog.com

昇格可能な RWMutex では、Read Lock を獲得した goroutine 同士が昇格しようとすると デッドロック を起こしてしまうという課題がありますが、この umutex ではデッドロックを検知する仕組みが備わっています。

f:id:kawasin73:20191206203651p:plain

ロックはレコードごとに獲得できるようにするために Locker というコンポーネントを追加しました。S2PL では範囲スキャンをした時にロックの穴あきによる予期せぬアクセスを防ぐため、範囲ロックを工夫しないといけないのですが、今回のトランザクションは範囲スキャンをサポートしていないのでシンプルな map[string]*lock で実現しています。レコードごとに複数の Read Lock が起こり得るため参照カウントを持ち不必要なロックオブジェクトを削除しています。

トランザクションでは Write Set にあるレコードは全て Write Lock を獲得していることを表します。同様に Read Lock を獲得していることを表現するために Read Set も追加しました。

Step 1 では、更新内容は Logs と Write Set の両方に保存していましたが冗長であることに気づいたので、Write Set には Logs のインデックス番号だけを保存するように最適化しました。

COMMIT 処理でのログの書き込み中に他のトランザクションの COMMIT 処理が割り込まれるとデータが壊れてしまいます。goroutine は厳密なシングルスレッドではないため、goroutine が COMMIT 処理中にコンテキストスイッチを起こして別の goroutine の COMMIT 処理が割り込まれる可能性があります。3 そのため、COMMIT 処理全体を sync.Mutex によって排他制御するようにしました。


S2PL を導入するために昇格可能な RWMutex を実装するだけで、並行制御されたトランザクションが実装できてしまいました。goroutine の強力さのおかげです。

Step 3 : マルチスレッドで同時に複数のトランザクションが並行に動く

GitHub - kawasin73/txngo at S2PL-multi-thread

最終ステップとしてマルチスレッドで動かすために、runtime.GOMAXPROCS(1) を消します。

あとは、複数のスレッドから同時アクセスされそうな所に排他制御を導入するだけです。Locker と DB へのアクセスをするときに map の更新で競合する場合があるので sync.Mutexsync.RWMutex で保護するようにしました。


まさかのマルチスレッド対応が一瞬で終わってしまいました。goroutine にのっかるだけで同じロジックでマルチスレッドで動くというのはすごいことです。

最後に

無事、マルチスレッドで動く S2PL で並行制御されたトランザクションを完成させることができました。こんなにあっさりできてしまったのは goroutine のおかげです。

ただ、最初に述べたように色々制約をつけてサボっているところが多いです。インデックスにデータ構造を適用したりとか、バッファプールを追加したりとか、チェックポイントの作成を定期的に行ったりとか、WAL をロックを取らずに書き込んだりとか色々改善の余地はあります。

しかし、Go はメモリが枯渇することを想定しない 4 言語である上に、メモリ管理が GC と密に結合しているのでメモリ枯渇への対応であるバッファプールを実装するのはちょっと辛いです。([]byte から特定の型へ unsafe.Pointer などを使ってキャストしても GC の対象外となる)

次は、そういう改善点と並行スケジューラの実装を含めて柔軟な言語である C や C++ で実装してみたいと思っています。

最後になりますが、今回の試みはセキュリティキャンプで教科書を提供していただき、途中でレビューをしてくださった 星野さん のご協力なしには実現できないものでした。改めて感謝申し上げまして、この記事を終わりたいと思います。

Go で昇格可能な RWMutex を実装した

順番は守りましょう。どうも、かわしんです。トランザクションを実装中です。

さて、先日トランザクションの並行制御アルゴリズムである「S2PL (Strict Two Phase Lock)」を実装した 1 のですが、Read オペレーションでは Read Lock を取った後にすぐに解放していました。2 しかし、よくよく考えると 2PL はロックを解放した後にロックを取得することを禁じているため(だからこそ 2 phase なのですが)、実装が間違っていました。

つまり、Read Lock して Read した値はロックを保持し続け、Write するときにロックを Write Lock に昇格させる必要がありました。

この振る舞いを実現するためには、昇格に対応した Read Write Lock の仕組みが必要になります。

Go には sync.RWMutex という goroutine に対応した便利な Read Write Lock の仕組みが用意されていますが、昇格には対応していません。Github を探しても使えそうなライブラリはなかったので自作しました。

github.com

今回はこのライブラリの紹介です。

欲しかったもの

欲しかった機能は以下の通りです。

既存のライブラリ

一応 Go におけるロックの実装を調べたので、既存のライブラリがなぜ使えなかったのかを列挙していきます。

世の中に必要なコンポーネントがないからと、プロダクトを作るのを諦めたり、仕様を捻じ曲げたりするのは嫌いなので、自作しました。

実装

ロックの仕組みと実装方法

Go でのロックの仕組みはこの動画がわかりやすいです。

www.youtube.com

Mutex は以下の 2 段階に分けてロックが取られています。

  1. アトミックな操作による spin lock
  2. goroutine を停止させてキューに詰めて待つ

sync.Mutex で後者の goroutine 同士の待ち合わせの処理は runtimeAPI である runtime_SemacquireMutex() でおこなっているようですが、これは標準ライブラリである sync にだけ公開されているため、実質 goroutine 同士のプリミティブな待ち合わせ処理を僕たち一般の開発者が利用することはできません。

そこで、sync.Mutexsync.RWMutex といった標準ライブラリの Mutex コンポーネントを利用して実装することにしました。

注意すること

ここで昇格可能な Read Write Lock の振る舞いとして注意しておくべきことを紹介します。

1 つ目は、Lock() は Upgrade() を妨げない ということです。Upgrade()sync.RWMutex で愚直に実装しようとすると RUnlock() した後に Lock() をすることになると思います。

func (m *UMutex) Upgrade() {
    m.rwmu.RUnlock()
    m.rwmu.Lock()
}

しかしこの時、別の goroutine が Lock() をして既に取得されている Read Lock が解除されるのを待っているとすると、Upgrade()RUnlock() でロックが横取りされてしまいます。ロックを横取りされないように実装する必要があります。順番は守りましょう

2 つ目は、複数が同時に Upgrade() するとデッドロックしてしまう ということです。Upgrade() は自身以外の全てが Read Lock を手放すまで待ち合わせます。しかし、複数が Upgrade() するということはお互いに Read Lock を手放さないため Dead Lock してしまいます。

品質の低いプロダクトであるなら Dead Lock するようなクエリを発行するユーザの責任だとして、goroutine がリークしようがお構いなしなのかもしれませんが、使い勝手は非常に悪くなります。そのため、デッドロックする場合はタイムアウトや Dead Lock の検出をして自動で失敗させるなどの Developer Friendly な振る舞いをすることが望まれます。

実際のコード

実装はそんなに難しくなくて 51 行とシンプルです。 コミットログをたどるとわかりますが、ちょうど 1 時間で実装していました。(実装開始時の設計では順番抜かしがあることが発覚して途中で設計をやり直しているので時間がかかりました)

前述の注意する点ですが、

  • Lock の横取り問題は UMutex.u の個数を確認して Upgrade 中であればロックを解放してリトライするようにして解決しました。
  • デッドロック問題は、UMutex.u をアトミックに操作して同時に Upgrade している個数を管理して、既に Upgrade 待ちをしている場合はデッドロックを検出して Upgrade を失敗させることで解決しました。

短いので実際のコードも貼り付けておきます。参考までに。

package umutex

import (
    "sync"
    "sync/atomic"
)

// UMutex is simple implementation of Upgradable RWMutex.
type UMutex struct {
    rwmu sync.RWMutex
    u    int32
}

// RLock locks shared for multi reader.
func (m *UMutex) RLock() {
    m.rwmu.RLock()
}

// RUnlock unlocks reader lock.
func (m *UMutex) RUnlock() {
    m.rwmu.RUnlock()
}

// Lock locks exclusively for single writer.
func (m *UMutex) Lock() {
lock:
    m.rwmu.Lock()
    if atomic.LoadInt32(&m.u) > 0 {
        // Upgrade is given priority to Lock, retry lock.
        m.rwmu.Unlock()
        goto lock
    }
}

// Unlock unlocks writer lock.
func (m *UMutex) Unlock() {
    m.rwmu.Unlock()
}

// Upgrade converts reader lock to writer lock and returns success (true) or dead-lock (false).
// If Upgrade by multi reader locker at same time then dead-lock.
// Upgrade is given priority to Lock.
func (m *UMutex) Upgrade() bool {
    success := atomic.AddInt32(&m.u, 1) == 1
    if success {
        m.rwmu.RUnlock()
        m.rwmu.Lock()
    }
    atomic.AddInt32(&m.u, -1)
    return success
}

まとめ

これで昇格可能な Read Write Mutex の Go の実装ができました。次回はとうとうトランザクションを実装します。多分 Go 2 のアドベントカレンダー になると思いますが。

Go : Mutex は channel の 4 倍速い

速ければ速いほど良い。どうもかわしんです。トランザクションを実装中です。

トランザクションの並行処理で S2PL (Strict Two Phase Lock) を Go で実装しようとしているのですが、どうしても昇格可能な Reader Writer Mutex が必要になり、Github にいい実装がなかったので自分で実装しようとしています。

さて、独自の Mutex を実装するにあたり goroutine 同士の待ち合わせは何かを使って実現する必要がありますが、Go には sync.Mutex とチャネルがあります。

どちらとも、ロックしてアンロックするということができます。振る舞いは同じです。

振る舞いが同じとなればどちらが速いかが重要となります。

ということで実験してみました。

実験

環境は以下の通りです。

ベンチマークコード

以下を適当に main_test.go としておきます。

package main

import (
    "sync"
    "testing"
)

func BenchmarkMutex(b *testing.B) {
    var mu sync.Mutex
    var n int
    for i := 0; i < b.N; i++ {
        mu.Lock()
        n++
        mu.Unlock()
    }
}

func BenchmarkChan(b *testing.B) {
    ch := make(chan struct{}, 1)
    var n int
    for i := 0; i < b.N; i++ {
        ch <- struct{}{}
        n++
        <-ch
    }
}

func BenchmarkMultiMutex(b *testing.B) {
    var mu sync.Mutex
    var n int
    var wg sync.WaitGroup
    wg.Add(10)
    for j := 0; j < 10; j++ {
        go func() {
            for i := 0; i < b.N; i++ {
                mu.Lock()
                n++
                mu.Unlock()
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

func BenchmarkMultiChan(b *testing.B) {
    ch := make(chan struct{}, 1)
    var n int
    var wg sync.WaitGroup
    wg.Add(10)
    for j := 0; j < 10; j++ {
        go func() {
            for i := 0; i < b.N; i++ {
                ch <- struct{}{}
                n++
                <-ch
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

BenchmarkMulti****() は、10 の goroutine で同時に動かすものです。

結果

GOMAXPROCS=1 でおこなったものがこれです。

$ GOMAXPROCS=1 GO111MODULE=off go test -bench .
goos: darwin
goarch: amd64
BenchmarkMutex          100000000               10.4 ns/op
BenchmarkChan           28564992                40.5 ns/op
BenchmarkMultiMutex      9439315               126 ns/op
BenchmarkMultiChan       1000000              1189 ns/op
PASS
ok      _/Users/kawasin73/work/sample-go/rwlock 4.808s

普通のマルチスレッドでおこなったものがこれです。

$ GO111MODULE=off go test -bench .
goos: darwin
goarch: amd64
BenchmarkMutex-8                100000000               10.7 ns/op
BenchmarkChan-8                 27600146                42.3 ns/op
BenchmarkMultiMutex-8            2536651               494 ns/op
BenchmarkMultiChan-8              736453              1710 ns/op
PASS
ok      _/Users/kawasin73/work/sample-go/rwlock 5.307s

まとめ

sync.Mutex の方がチャネルよりだいたい 4 倍速いです。チャネルは内部でロックを取った上でごにょごにょやっているので遅いのは当たり前です。

また、シングルスレッドにするとスレッド同士の待ち合わせが無くなったりシングルスレッド用の最適化があるのかどうかはわかりませんが、速くなります。速くなるというよりは、マルチスレッドで動かすと遅くなるという表現の方が理解しやすそうですが。

シンプルな昇格可能な RWMutex を作る分には sync.Mutex を使う方向でできそうです。

ただ、タイムアウトをさせようとすると Go では必ずチャネルを使わないといけなくなりそうなのが残念です。

MySQL は Rigorous ではない

トランザクションは慎重に。どうも、かわしんです。今トランザクションを実装しています。

タイトルは釣りです。MySQL のデフォルトのトランザクション分離レベルである REPEATABLE READ での話です。後、確かめた訳ではないですが MySQL に限った話ではないです。

さて、トランザクションの用語に Strictness (ST) と Rigorous (RG) というものがあります。詳しくはこの記事を読むとヒントになるかもしれません。

トランザクションの Strictness と Rigorousness の定義を再確認する - ぱと隊長日誌

これら2つの違いはあるトランザクションがあるレコードを Read した時に、Strictness は別のトランザクションでも そのレコードに Write できる のに対して、Rigorous は Read したトランザクションが Commit か Abort するまで Write がブロックされる かの違いです。Rigorous の方が Strictness よりも厳密です。

つまり 2PL の文脈でいえば、内部的に取得した Read Lock を Commit/Abort 前に解放するかどうかの違いになります。

今回はこれを実際の MySQL で試して確認し、実際のアプリケーションでのユースケースで気をつけるべきところを紹介します。

実験をしてみる

今回試したのは、MySQL 8.0.15 です。Docker で動かしています。

テーブルは以下の構造で、1つだけレコードを追加しています。

CREATE TABLE hoge (
  id INT NOT NULL,
  value INT NOT NULL default 0,
  PRIMARY KEY (id)
);
INSERT INTO hoge (id, value) VALUES (1, 1);

2 つのコネクションを作成しそれぞれでトランザクションを作って id = 1 のレコードの value をカウントアップする実験をしていきます。

操作は全て mysql クライアントコマンドで行っています。

普通に値を更新して Rigorous でないことを確かめる

2 つのコネクションからそれぞれ以下の SQL を同時に1行ずつ実行します。

BEGIN;
SELECT * FROM hoge WHERE id = 1;
UPDATE hoge SET value = 2 WHERE id = 1;
SELECT * FROM hoge WHERE id = 1;
COMMIT;

片方で BEGIN した後にもう片方で BEGIN をして ... と行った具合です。

まず、両方のトランザクションid = 1value は 1 であるとわかります。

mysql> SELECT * FROM hoge WHERE id = 1;
+----+-------+
| id | value |
+----+-------+
|  1 |     1 |
+----+-------+
1 row in set (0.00 sec)

そこで両方のトランザクションでその値に 1 を加えた 2 を更新します。

mysql> UPDATE hoge SET value = 2 WHERE id = 1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> SELECT * FROM hoge WHERE id = 1;
+----+-------+
| id | value |
+----+-------+
|  1 |     2 |
+----+-------+
1 row in set (0.00 sec)

片方のトランザクションはこのように更新に成功します。少なくとも SELECT したトランザクションがある中で UPDATE が成功しているため、Rigorous ではない ということがわかります。

更新のブロッキングと奇妙な挙動

さて、片方のトランザクションですが、ブロックされレスポンスが返ってきません。

そして、更新に成功したトランザクションCOMMIT するとブロックされたトランザクションが動き始めます。

mysql> UPDATE hoge SET value = 2 WHERE id = 1;
Query OK, 0 rows affected (7.08 sec)
Rows matched: 1  Changed: 0  Warnings: 0

mysql> SELECT * FROM hoge WHERE id = 1;
+----+-------+
| id | value |
+----+-------+
|  1 |     1 |
+----+-------+
1 row in set (0.00 sec)

しかし、値の更新に失敗 します。Changed: 0 となっていることからも更新に失敗していることがわかります。

ここで別の値(例えば 3 とか)に更新すると更新に成功します。

なぜ同じ値だと更新に失敗して違う値だと更新に成功するのかがよくわかりません 。両方成功していいはずです。

これはあくまでも僕の想像ですが、同じ値に更新するということは前の値に対して同じ相対的な操作(この場合はカウントアップ)を行っている可能性が高いです。そのため、意図せぬ競合状態によるデータの不整合が発生している可能性が高く、そのバグに利用者が気付きやすくするためにあえて更新を失敗させているのではないかと思いました。

もしカウントアップなどの相対的な操作ではなく、両方ともその値にするという絶対的な操作であった場合でも更新に失敗しても最終的な値はその値になっているため、データが壊れるということはなくデメリットは小さいです。

こういう利用者のミスに優しい MySQL の親切設計なのではないかと推察しました。

逆に、片方では +1 して片方では +2 するみたいなユースケースでは更新に失敗しないため注意が必要です。

正しくカウントアップを成功させる

SELECT した値に操作をして UPDATE しても更新に失敗することがわかりました。ではどうすれば正しく値のカウントアップができるのでしょうか?

方法は2つあります。

  • ロックをとる
  • UPDATE 文を工夫する

1つは SELECT するときに ロックをとる 方法です。MySQL では、SELECT 文の末尾に FOR UPDATE をつけることでロックを取得できます。ロックを取得したトランザクションが Commit/Abort するまで別のトランザクションのロックはブロックされ、ロックを獲得できた時には更新された値を取得することができます。

SELECT * FROM hoge WHERE id = 1 FOR UPDATE;

もう1つは UPDATE 文を工夫する というものです。値の相対的な操作がカウントアップなど単純な場合は、value = value + 1 とすると相対的な操作がされて更新されます。

UPDATE hoge SET value = value + 1 WHERE id = 1;

別のトランザクションで更新された値に対して加算が行われるため、MySQL の REPEATABLE READ のトランザクション分離レベルであっても、以下のように 1SELECT された値に 1 を足したら 3 になったみたいな一見直感に反する結果になります。

mysql> BEGIN;
Query OK, 0 rows affected (0.00 sec)

mysql> SELECT * FROM hoge WHERE id = 1;
+----+-------+
| id | value |
+----+-------+
|  1 |     1 |
+----+-------+
1 row in set (0.00 sec)

mysql> UPDATE hoge SET value = value + 1 WHERE id = 1;
Query OK, 1 row affected (1.16 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> SELECT * FROM hoge WHERE id = 1;
+----+-------+
| id | value |
+----+-------+
|  1 |     3 |
+----+-------+
1 row in set (0.00 sec)

mysql> COMMIT;
Query OK, 0 rows affected (0.00 sec)

SERIALIZABLE ではどうなるか

MySQL は Rigorous ではないと大見得を切ってしまいましたが、MySQL でもトランザクション分離レベルを SERIALIZABLE にすると Rigorous になります。

SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;

2 つのトランザクションSELECT した上で UPDATE するとブロックされて処理が返ってきません。

そのあとに両方のトランザクションUPDATE するとデッドロックしてしまいます。

mysql> UPDATE hoge SET value = 14 WHERE id = 1;
ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction

SERIALIZABLE は厳密にはなりますが速度が遅くなってしまうため大量の処理を捌くには向いていないです。

まとめ

MySQL の Rigorous の挙動を調べるために色々脱線しましたがまとめると以下のようになります。

  • MySQL のデフォルトのトランザクション分離レベルである REPEATABLE READ では Rigorous ではない
    • ただし同じ値に対して同じ操作をしたと思われる場合は親切にも MySQL は更新を失敗させる
  • 相対的な値の操作をする場合は SELECT ~~ FOR UPDATE してロックをとるか、UPDATE ~~ value = value = 1SQL 内に相対的な操作を記述する。
  • SERIALIZABLE は Rigorous になるが遅いし、容易にデッドロックする

アプリケーションのユースケースとしては、ポイントの加算など値の相対的な操作をすることはよくあると思います。

その際は必ずロックを取るか SQL 内に相対的な操作を記述することでエラーのない整合性の取れたデータ更新ができるようになります。

WEBエンジニア勉強会 #15 で登壇しました

プログラマーあるある、なにかと独自のミニ言語を作りがち。どうも、かわしんです。

昨日、11 月 16 日に開催された WEBエンジニア勉強会 #15 の LT 10 分枠で「ネストした JSONCSV に自動変換する Python ライブラリを作った」というタイトルで登壇してきました。

スライドは以下です。

ネストした JSON を CSV に自動変換する Pythonライブラリを作った - Google スライド

発表の内容自体は、以前の記事の中身をわかりやすく解説しました。

kawasin73.hatenablog.com

個人的には、この発表では Step 1 ~ 6 に分けて段階的に JSON から CSV へ変換する文法を育てていくところが見どころです。

他の方の発表では、javascript の HTTP クライアント axios を Typescript の世界で型安全にするために、規約づけされた型定義ファイルから axios ラッパーを生成するツール「aspida」が面白いなと思いました。

VS Code での補完が効くことで開発効率が上がりそうです。また、ただのツールにはとどまらず、型定義ファイルとそのラッパーを npm パッケージとして @aspida/<domain> などのように公開することで Typescript での @types/<domain> のような世界を作り上げる野望をぶち上げており、その世界観に感動しました。

発表スライドが公開されていないのが残念ですが(スライドを見ないとあの感動は伝わらないと思う)、Github リポジトリは以下です。某大手の内部で採用されているらしいので乗っかってみるのもいいと思います。

github.com

あと、独自のフルスクラッチ javascript View ライブラリ「Ma_gician(仮)」の発表も面白かったです。Yet Another Vue.js なのかなという印象を受けましたが、とにかくコーディング量を減らすための独自構文を HTML タグに埋め込むことで少量のコードで Vue.js と同じことができるらしいです。

また、他のライブラリからの乗り換えも最小限でコストでできるような工夫もされているらしく、ひしひしと野心を感じました。まだ、リリースはされておらず絶賛開発中で、正式名称やコードは公開されてませんが、今後に期待です。

作者の方 の記事を読むと、外部依存なし、パフォーマンスを妥協しないなど、僕の好きな感じだったので注目していきたいと思います。

docs.google.com

最後に発表中のツイートを引用して筆をおきたいと思います。