kawasin73のブログ

技術記事とかいろんなことをかくブログです

原因不明のエラーなどない

お前のいう想定外はただの調査不足。どうも、かわしんです。新年度が始まり周りのみんなは働き始めているのを感じますが、僕は4月6日入社なのでもう少しモラトリアムを満喫します。

さて、去年開催された GoCon 2019 Spring で「エラー設計について / Designing Errors」という発表がありました。

docs.google.com

この発表ではエラーを「既知のエラー」「未知のエラー」と分類し、アプリケーションを動かしながら発生した「未知のエラー」に対するハンドリングを追加しながらどんどん「既知のエラー」に変換していくことで、ハンドリングされないエラーを減らしていこうというアプローチを紹介していたものと僕は認識しています。

しかし、具体的にエラーとはどこから発生するのかという説明がなかったため、いつどこから発生するかわからないエラーと戦い続ける必要があるようにも誤解される方がいるかもしれないと思ったのでこの記事を書くことにしました。

結論からいうと、全てのエラーは事前に予測可能であり恐れる必要はありません

前提

ここでは OS の上で動くソフトウェアのプログラミングを対象として扱います。C 言語とか Go とかの高級言語でのプログラミングです。

カーネルの中とか FPGA などのハードウェアプログラミングは対象としませんが本質は同じではないかなと思っています。

エラーの実態

エラーはプログラミング言語や書き手によって表現方法が違いますが、大抵の言語では「返り値」「例外」「シグナル」のどれかまたは複数で表現されます。どれを使うのがいいかということについては色々な考え方があると思うのでこの記事では触れません。

エラーはどこで発生するのか

我々がプログラミングをする上で扱うエラーの発生源は大きく以下の3つに分類できます。

  • 自分で定義して作り出すエラー
  • 関数を呼び出した時に発生するエラー
  • その他の実行時エラー

「自分で定義して作り出すエラー」は内部状態に対して入力値が異常である場合に処理を中断して発生させるエラーです。例えば、入力値のバリデーションエラーであったり、残高がないのにお金を引き出そうとしていた時のエラーなどがわかりやすいと思います。

「関数を呼び出した時に発生するエラー」は呼び出した関数の中で定義されて作り出されたエラーであったり、その関数がさらに別の関数を呼び出して発生したエラーであったりします。これらエラーはそのまま呼び出し元に返されることもありますし、別のエラーに変換されて返されることもあります。

さらに関数が呼び出した関数によるエラーを突き詰めて辿っていくと、全て「関数の作者が定義して作り出すエラー」と「システムコールから発生するエラー」にたどり着きます。システムコールは OS 上で動くソフトウェアとカーネルの境界をつないでいる命令で、 C 言語や Go などの大抵のプログラミング言語では関数の形で定義され、関数呼び出しと同じように扱うことができます。

システムコールがわからない人はコンピュータサイエンスの基本なので調べてみてください。簡単に説明すると、普通のプログラマが書くプログラムは「ユーザランド」という安全な領域で動き基本的にはメモリ演算しかできず、ネットワーク通信やファイル I/O、ディスプレイの出力、ハードウェア機器の操作などは悪いプログラムに乗っ取られるのを防ぐために全て「カーネルランド」という隔離された領域でしか実行できません。しかし、ユーザランドでしか動かないソフトウェアは何もできないので、ユーザランドは「システムコール」を発行してカーネルランドに処理の実行を依頼するという形になっています。

「その他の実行時エラー」は、不正な CPU 命令や不正なメモリアクセスを想定しています。これらが発生した場合は CPU 例外が発生し OS で定義された例外ルーチンが実行されアプリケーションは停止されたりシグナルが飛んできたりします。(詳しくないので間違っていたらすみません。)しかし、これらのエラーは大抵のメモリ安全な高級言語では仕組み上そもそも発生しえないものです。逆に発生するとデバッグは大変です。

また、ゼロ除算も実行時エラーです。言語によってはゼロ除算をした時の挙動は未定義だったりしますが言語によってはゼロ除算をするときにエラーを発生させてくれます。

全てのエラーは定義されている

さて、エラーの発生源を辿っていくと全て自分か誰か(ライブラリの作者など)が定義したエラーかシステムコールから発生するエラーにたどり着きました。

自分か誰か(ライブラリの作者など)が定義したエラーは自明に定義されていますが、システムコールから発生するエラーも全て定義されています。

例えば外部とのネットワーク通信で使うソケットの読み取り命令 recv を見てみましょう。recv man page とググれば以下の Linux のマニュアルが出てきます。または、ターミナルで man 2 recv と実行すると実行した OS のマニュアルが読めるはずです。

https://linuxjm.osdn.jp/html/LDP_man-pages/man2/recv.2.html

ここにはエラーの章があり、全ての発生しうるエラー(EAGAIN EWOULDBLOCK EBADF ECONNREFUSED EFAULT EINTR EINVAL ENOMEM ENOTCONN ENOTSOCK)とそのエラーがどのような時に発生するのかが定義されています。逆に言えばここに定義されていないエラーは絶対に発生しません。

ここまでで、原因不明のエラーなど存在せず全てのエラーが定義されているということがわかったと思います。

エラーハンドリング

さて、(理想的には)全ての関数から発生するエラーは全て定義されているのでそれに対するエラーハンドリングをしていけばいいということになります。

ここでエラーは大きく2つに分類されます。「ハンドリングする(回復可能な)エラー」か「ハンドリングしない(回復不能な)エラー」です。全て定義されているのでそこに想定外のエラーなどありません。

その関数のレイヤーで回復可能な場合はエラーハンドリングをして処理を続行し、そのレイヤーで回復不能な場合はそのエラーを呼び出し元に返し上のレイヤーでエラーハンドリングされることを期待します。

発生したエラーを全て上のレイヤーに戻していればプロセスは異常終了してしまいます。要件の厳しくないアプリケーションサーバなどであれば異常終了させて再起動して後でツギハギの対応をすればいいかもしれませんが、データベースサーバなど一度起動したら数年単位で落ちることが許されないソフトウェアであれば実装時に1つ1つのエラーを精査して回復可能なエラーを拾っていきます。想定外のエラーはただの調査不足です。

しかし現実

しかし、現実はそんなに簡単ではありません。ライブラリから発生するエラーがドキュメントなどで明確に定義されていない場合があります。その場合は冒頭で紹介したような「未知のエラー」を「既知のエラー」に変換していくアプローチが有効だと思います。

一方でもう1つ解決策があります。それは「ライブラリを使わない」ことです。ライブラリを追加すれば追加するほど自分が理解していないコードが増え、把握できていないエラーが発生する可能性が高まります。関数ごとのエラーが定義されていないライブラリは使わずに自分でシステムコールの上にエラー定義されたライブラリを築き上げていくのも有効な解決策だと思います。大変ですが。

言いたかったこと

  • 関数を作るときは発生し得るエラーを定義しろ
  • 全てのエラーを精査して、必要なハンドリングをしろ
  • エラーは全て定義されていて予測可能である。恐れる必要はない

番外編:境界をずらす

エラーの発生源は境界ごとに定義されていることで想定外のエラーのないソフトウェアを作ることができます。アプリケーションとライブラリとの境界(ライブラリ関数呼び出し)、ユーザランドカーネルランドの境界(システムコール)を今回の記事では紹介しました。システムコールについては完全にエラーが定義されドキュメント化されています。

さらに、システムコールの実装の中身をカーネルの中に辿っていくと同様に「カーネルの中で定義されたエラー」と「ハードウェアから発生するエラー」にたどり着きます。ハードウェアから発生するエラーはソフトウェアとハードウェアの境界で定義されたエラーです。

このように境界ごとに明確にエラーを定義することで想定外のないソフトウェアが実現可能になります。

番外編:ハードウェア障害

この記事では暗黙の前提として、ハードウェアが正しく動くことを前提としていました。しかし、ハードウェアも壊れることがあります。メモリが破壊されたり、ストレージが破壊されたり。これらの障害は OS のレイヤーで検知されてエラーとして発生することもありますが、アプリケーションが管理している領域のメモリの値が変わってしまったり、1 + 1 を 3 と CPU が計算してしまったような場合には対応が難しいです。エラーハンドリングの処理自体が正しく動くかわかりません。僕は、普通のソフトウェアを作っている場合はこのような状況はどうしようもないので異常終了するべきだと思いますが、世の中にはこのようなコンピュータが正しく動かないことも想定したソフトウェアの世界があるのかもしれません。(ないのかもしれません)

失敗した AWS Batch ジョブを Slack に通知する (Terraform を使って)

サーバーレスでピタゴラスイッチ。どうも、かわしんです。イベントをサーバレスで繋げてピタゴラスイッチを作るのって案外楽しいもんですね、GUI コンソールで作ってる限りは。

さて、今回は AWS Batch のジョブ実行が失敗した時に Slack に通知する機能を作りたかったのですが、断片的な記事しか見当たらなかったのでこの記事でまとめようと思います。また、今回はインフラ構築ツールとして Terraform を使います。

多分、断片的な記事を普通に繋げてると動かないハマりポイントがあるので、後学の為に注意喚起するという目的もあります。

全体のアーキテクチャ

全体の流れはこんな感じでイベントを繋げていきたいと思います。

Batch -> CloudWatch -> SNS -> Lambda -> Slack

AWS Batch ではジョブの状態が変わるたびにイベントが発生します。 CloudWatch Event Rule を設定してイベントの中から FAILED になったイベントのみをフィルタリングして SNS Topic に流します。

AWS SNS はイベントを受け取ったら Lambda を起動して、Lambda に登録した Node のコードがイベントを整形して Slack に Webhook を叩いて通知するという流れです。

CloudWatch から Lambda を直接起動することもできますが、SNS を経由することで Slack の他にもメールなどの他のチャネルへの通知を拡張することができる為、SNS を経由することにしました。

今回は、この CloudWatch と SNS と Lambda を Terraform を使ってセットアップしていきます。

Lambda で動かすパッケージ

さて、CloudWatch のイベントを Slack へ通知する Lambda Function は標準では用意されていません。イベントの JSON を整形して Slack のコメントとして Webhook を叩く処理を実装する必要がありますが、一から書くのは面倒です。

そこで、aws-to-slack というツールスタックを利用することにしました。

github.com

aws-to-slack は CloudWatch や Lambda のセットアップまでを含めて make deploy を使って簡単に完了させることができるツールスタックです。しかし、AWS リソースのセットアップには CloudFormation を使っている為 Terraform との相性は悪く、勝手に AWS リソースを作られるのも気に入りません。

make package コマンドを実行すれば Lambda で実行する Node のパッケージがコンパイルされる為、Lambda 上で動かすコードだけを aws-to-slack では利用することにしました。

事前に以下のコマンドを実行してコンパイル結果である release.zip を生成します。

$ git clone https://github.com/arabold/aws-to-slack.git
$ cd aws-to-slack
# 生成物 release.zip ができる
$ make package

aws-to-slack ディレクトリに生成された release.zip を任意の S3 バケットにアップロードしておきます。

構築する Terraform

S3

S3 にアップロードした aws-to-slack の release.zip を参照する為に data.aws_s3_bucket_object.lambda_to_slack を作ります。

/*
 * https://www.terraform.io/docs/providers/aws/d/s3_bucket_object.html
 */
data "aws_s3_bucket_object" "lambda_to_slack" {
  bucket = "<bucket_name>"
  key    = "<key_prefix>/release.zip"
}

IAM Role

aws-to-slack の CloudFormation で指定されているような IAM ロールを作ります。

/*
 * https://www.terraform.io/docs/providers/aws/r/iam_role.html
 */
resource "aws_iam_role" "iam_for_lambda" {
  name = "iam-for-lambda"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

/*
 * https://www.terraform.io/docs/providers/aws/r/iam_role_policy_attachment.html
 */
// https://github.com/arabold/aws-to-slack/blob/1451c1beae7b8f635c42161587bddbce04442857/cloudformation.yaml#L68-L70
resource "aws_iam_role_policy_attachment" "iam_for_lambda1" {
  role       = aws_iam_role.iam_for_lambda.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy_attachment" "iam_for_lambda2" {
  role       = aws_iam_role.iam_for_lambda.name
  policy_arn = "arn:aws:iam::aws:policy/CloudWatchReadOnlyAccess"
}

resource "aws_iam_role_policy_attachment" "iam_for_lambda3" {
  role       = aws_iam_role.iam_for_lambda.name
  policy_arn = "arn:aws:iam::aws:policy/AWSCodeCommitReadOnly"
}

Lambda

Lambda のセットアップをします。Lambda Function の具体的な設定は aws-to-slack の CloudFormation を参考にしています。

Slack の Webhook URL は https://my.slack.com/apps/manage/custom-integrations にアクセスして生成します。

/*
 * https://www.terraform.io/docs/providers/aws/r/lambda_function.html
 */
// https://github.com/arabold/aws-to-slack/blob/1451c1beae7b8f635c42161587bddbce04442857/cloudformation.yaml#L46-L63
resource "aws_lambda_function" "lambda_to_slack" {
  s3_bucket         = data.aws_s3_bucket_object.lambda_to_slack.bucket
  s3_key            = data.aws_s3_bucket_object.lambda_to_slack.key
  s3_object_version = data.aws_s3_bucket_object.lambda_to_slack.version_id
  function_name     = "lambda-to-slack"
  handler           = "src/index.handler"
  role              = aws_iam_role.iam_for_lambda.arn
  memory_size       = 256
  runtime           = "nodejs10.x"
  # Cross-region metrics lookup requires at least 10s
  timeout = 15
  environment {
    variables = {
      SLACK_CHANNEL  = "<channel_name>"
      SLACK_HOOK_URL = "https://hooks.slack.com/<hook_api_path>"
    }
  }
}

/*
 * https://www.terraform.io/docs/providers/aws/r/lambda_permission.html
 */
resource "aws_lambda_permission" "sns_to_lambda_to_slack" {
  statement_id  = "lambda-to-slack"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.lambda_to_slack.function_name
  principal     = "sns.amazonaws.com"
  source_arn    = aws_sns_topic.cloudwatch_slack.arn
}

CloudWatch

resource.aws_cloudwatch_event_rule.batch_failed_job を設定して Batch から発生するイベントをフィルタリングします。これは以下の AWS 公式のチュートリアルを参考にしました。

参考 : チュートリアル: 失敗したジョブイベントに関する Amazon Simple Notification Service アラートを送信する

/*
 * https://www.terraform.io/docs/providers/aws/r/cloudwatch_event_rule.html
 */
resource "aws_cloudwatch_event_rule" "batch_failed_job" {
  name = "batch-failed-job"

  event_pattern = <<PATTERN
{
  "detail-type": [
    "Batch Job State Change"
  ],
  "source": [
    "aws.batch"
  ],
  "detail": {
    "status": [
      "FAILED"
    ]
  }
}
PATTERN
}

/*
 * https://www.terraform.io/docs/providers/aws/r/cloudwatch_event_target.html
 */
resource "aws_cloudwatch_event_target" "sns_slack" {
  rule = aws_cloudwatch_event_rule.batch_failed_job.name
  arn  = aws_sns_topic.cloudwatch_slack.arn
}

SNS

SNS の設定はこんな感じになります。resource.aws_sns_topic.cloudwatch_slack.policy については後述します。

/*
 * https://www.terraform.io/docs/providers/aws/r/sns_topic.html
 */
resource "aws_sns_topic" "cloudwatch_slack" {
  name = "cloudwatch-to-slack"

  // https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/resource-based-policies-cwe.html#sns-permissions
  policy = <<POLICY
{
  "Version": "2012-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__default_statement_ID",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "SNS:GetTopicAttributes",
        "SNS:SetTopicAttributes",
        "SNS:AddPermission",
        "SNS:RemovePermission",
        "SNS:DeleteTopic",
        "SNS:Subscribe",
        "SNS:ListSubscriptionsByTopic",
        "SNS:Publish",
        "SNS:Receive"
      ],
      "Resource": "arn:aws:sns:<region_name>:<account_id>:cloudwatch-to-slack",
      "Condition": {
        "StringEquals": {
          "AWS:SourceOwner": "<account_id>"
        }
      }
    },
    {
        "Sid": "TrustCWEToPublishEventsToMyTopic",
        "Effect": "Allow",
        "Principal": {
            "Service": "events.amazonaws.com"
        },
        "Action": "sns:Publish",
        "Resource": "arn:aws:sns:<region_name>:<account_id>:cloudwatch-to-slack"
    }
  ]
}
POLICY
}

/*
 * https://www.terraform.io/docs/providers/aws/r/sns_topic_subscription.html
 */
resource "aws_sns_topic_subscription" "cloudwatch_slack" {
  topic_arn = aws_sns_topic.cloudwatch_slack.arn
  protocol  = "lambda"
  endpoint  = aws_lambda_function.lambda_to_slack.arn
}

ハマりどころ:SNS トピックの Policy

さて、最初の時点では aws_sns_topicpolicy は指定していませんでした。その状態で試してみると、CloudWatch Event Rule から SNS Topic への送信に失敗します。

さらに、AWSGUI コンソールから CloudWatch Event Rule を作ると Terraform で作ったルールと主導で作ったルールの両方でイベントの送信に成功するようになります。そのあとに主導で作ったものを削除して Terraform で作ったものだけに戻すと失敗するようになります。

ここで Terraform の aws_cloudwatch_event_target のドキュメントをよく読むと以下のように注意書きがされています。

Note: In order to be able to have your AWS Lambda function or SNS topic invoked by a CloudWatch Events rule, you must setup the right permissions using aws_lambda_permission or aws_sns_topic.policy . More info here .

https://www.terraform.io/docs/providers/aws/r/cloudwatch_event_target.html

つまり、SNS トピックに送信する場合には aws_sns_topicpolicy を指定する必要があるということです。しかし、具体的に何を書けばいいのかは教えてくれず無造作に AWS のドキュメント へのリンクが置かれているだけです。

このドキュメントを読むと、一度 SNS トピックを作った後でその Policy に以下の Policy を追加するように書かれていました。

{
  "Sid": "TrustCWEToPublishEventsToMyTopic",
  "Effect": "Allow",
  "Principal": {
    "Service": "events.amazonaws.com"
  },
  "Action": "sns:Publish",
  "Resource": "arn:aws:sns:region:account-id:topic-name"
}

そこで実際に作成した SNS トピックの Policy を調べて前述の resource.aws_sns_topic.cloudwatch_slack.policy が完成しました。

しかし、この Policy の中には SNS トピックの ARN が含まれています。Terraform のように1度のオペレーションで Immutable にリソースを作成する場合は事前にリソースの ARN を参照することは自己参照になりできません。

つまり、これを綺麗に Terraform で実現することはできないのです。だから、Terraform のドキュメントでは具体的な手法を説明するのではなく無造作にリンクが置かれているだけなのだと理解しました。不親切ですが。

一方で、アカウント名と SNS トピックの名前がわかっている場合はリソースの生成前に ARN を予測することができるため、今回は直接指定して事なきを得ました。

最後に

これで AWS Batch の失敗ジョブを Slack に通知することができるようになりました。初めてサーバーレスというものを使ってみましたが案外面白かったです。

ただ、GUI だと裏側でよしなにやっているごにょごにょした部分を自分で実装する必要があり Terraform などを使うと案外めんどくさいことがわかりました。

AWS Batch はジョブ起動のレイテンシーを考えない場合はコスト低くできる(低いとはいってない)方法なので使ってみてはいかがでしょうか?(AWS Batch ジョブの SSM 連携がないのでハマりました)

SQS を使った Python の非同期ワーカーは ndkale しかない

誰一人見捨てない!!! どうも、かわしんです。Celery は見捨てるんです。

この記事は Pythonその2 Advent Calendar 2019 の 15 日目の記事です。

やや強めのタイトルですが、AWS SQS を使った非同期ワーカーでまともな実装は ndkale しかないという内容です。Celery は論外です。

github.com

前半はディスってばっかりなので、ndkale のことだけを知りたい場合は途中の「大本命 ndkale」から読んでください

前提としての欲しい機能

まず、諸々をディスる前に非同期ワーカーとして欲しい機能をあげておきます。

  • 正しく SQS を使って信頼性のあるタスク実行をする
  • 即時再実行をする
  • 複数のキューを使い分ける。また同じタスクでも動的に利用するキューを切り替えたい
  • Dead Letter Queue も使えると嬉しい

まず Celeryディスる

Python の非同期処理基盤として有名な OSS プロダクトとして Celery があります。

それなりにたくさんの人が使っていて、良さげな雰囲気があります。

Celery では Broker のうち RabbitMQRedisSQS の 3 つを Stable としてサポートしています。

Brokers — Celery 4.3.0 documentation

そこで SQS を Celery で使って試していました。

Celery のダメなところ

当たり前ですが、OSS を使うときはその中身のコードをざっと読んで本番環境で使える品質のものなのかを確かめます。

しかし、Celery は内部のコードが全く読めません 。どこが起動時のエンドポイントなのか、どこで Broker を切り分けているのか、コアとなる処理はどこに書いてあるのか、コードが難解すぎて全くわかりません。いざエントリーポイントを見つけてコードを追っていってもあっという間に呼び出す関数を見失ってしまいます。

ただ、たくさんの人が使っている信頼性があると思われるプロダクトだし、中のコードは読めないけど色々なケースを考慮して作られていることは読み取れたので Celery を使っていました。

Celery + SQS の致命的にダメなところ

Celeryデバッグログをだしながら使っていて気づいたことなのですが、Celery はプロセスの強制終了でメッセージをロスト します。これは信頼性のあるメッセージ伝達を行う SQS を使うワーカーとしては致命的です。もはや本番環境では使えない品質です。

まず、SQS はメッセージの At Least Once での伝達(FIFO キューの場合は Exactly Once)を実現するために以下の手順でメッセージの購読を行うことを想定しています。

  1. メッセージを SQS からタイムアウト付き(可視性タイムアウト)で 取得
  2. メッセージの内容でタスクを実行
  3. タスクが完了したら SQS からメッセージを 削除
  4. もしタスクの再実行が必要な場合は、タイムアウトによって自動的に復帰するのを待つか、タイムアウト0 に更新して即時復帰させる

タスクの実行が完了した後にメッセージを削除するので、タスク実行中にプロセスが突然終了した場合でもメッセージはタイムアウトの時間が経過後に復活して再実行されます。

もちろん、タスクの実行完了とメッセージの削除の間でプロセスが死ぬ場合もあるので、タスクは冪等に作る必要があります。

さて、Celeryデバッグログを観察する限り以下のような順序で処理をしているようでした。

  1. メッセージを SQS から 取得
  2. メッセージを 削除
  3. タスクを実行
  4. もしタスクの再実行が必要な場合は、新しいメッセージを SQS に送信

メッセージを削除するタイミングが早すぎます。もし、タスクの実行中にプロセスが突然終了した場合にはそのタスクは再実行されることなく、データのロストに繋がる可能性があります。

Celery を SQS で使う限りメッセージの信頼性はない です。

これはあくまでもデバッグログの順序を観察して僕が推測したことなので違うかもしれませんが、確かめようにも内部のコードが読めないので仕方ないです。

Celery は複数の Broker に対応しているため Broker のモデルとしての抽象化が難しく、妥協としてこのような実装になっていると思うので仕方ないとも言えます。が、少なくとも本番環境で使えるような品質ではないので Stable と表記するのはやめていただきたいです。

そのほかのライブラリをざっとディスる

さて、本命の ndkale にたどり着くまでにたくさんの SQS を使った非同期ワーカーライブラリを調査してその内部コードを読んだので、それぞれディスっていきます。もちろんそれぞれいいところはあるのですが、そこまで書くと長くなるので採用しなかった理由だけを書いていきます。

大本命 ndkale

さて、調査の結果 ndkale が実装の品質が高く、手を入れれば本番環境でも使えそうだったので利用しました。

engblog.nextdoor.com

概要はこのブログに記載されていますが、Celery で苦しんだ末に一から非同期ワーカーを実装されたようです。このブログ当時で 100万/day 単位のタスクを分散環境で処理している実績があります。

プレフィックスの nd は作成した会社の NextDoor からきているんだと思います。パッケージ名自体は kale です。2014 年に作られたライブラリで 5 年間運用されているようで、現在でもメンテナンスされているみたいです。

ndkale の最大の特徴は、優先度付きキュー に対応していることです。実現するために Lottery Algorithm を拡張した独自の Reduced Lottery というアルゴリズムでスタベーションなくメッセージを処理します。

もちろん、SQS を正しく使ってデータのロストがおこらないように実装しています(実際にはバグがありましたが、僕が修正 PR を投げました)。再実行についてはリトライ回数をインクリメントしたメッセージを再送信してからメッセージを消すようにしています。SQS のメッセージは Immutable なので再送信をしているようです。SQS が用意している Dead Letter Queue の仕組みは利用せず、独自に再実行回数を判別して Dead Letter Queue へ送信しています。

内部実装を読みましたが、設計と実装の品質がとても高い です。綺麗に書かれているのでスラスラ読めました。設計もよく考えられていて、利用者による機能の拡張がユーザーに露出するコンポーネントである「Task」と「Worker」を継承することで可能なような設計になっています。また、Task と Worker には様々なフックが用意されており、処理の注入もやりやすいです。

一方で、詳しいドキュメントがない という欠点もあります。簡単な使い方と ndkale の概念が README に書かれているだけで、手の込んだ使い方は自分で内部実装を読んで理解するしかありません。しかし、内部実装が品質が高くとてもわかりやすいので、読んでいるだけで 作者の気持ちがわかり、こうやって使えばいいんだなとか、ここを拡張すればいいんだな、みたいなことが読み取れます。読むと感動すると思います。すごい。

あと、仕様上の課題ですが、複数のキューに対応した時に メッセージがあるにも関わらず最大 20 秒間処理が止まってしまう 可能性があるという課題があります。 全てのキューにメッセージがない時にはランダムに選ばれたキューを Long Polling してメッセージを待ちます。しかし、その間に別のキューにメッセージがきても気づくことができず、Long Polling の最大時間である 20 秒間処理されない可能性があります。

この課題は頻繁にメッセージがくるような成熟したサービスであればそもそも Long Polling する時間も短く済むため問題にはなりませんが、全てのキューでメッセージの頻度が少ない場合に問題になります。解決策としては 複数キューでは使わずに単独のキューのみを監視するように使う 方法しかありません。最大の売りである優先度付きキューの機能は無視することになりますが、そのほかの単純なワーカーとしての品質が高いため ndkale を単独キューで利用します。

ndkale に足りないこと

ここまで大絶賛の ndkale ですが、残念ながらこのまま使うには不十分です。ただし、設計と実装の品質が高いため簡単に足りない機能を追加することができます。

単独キューを待って、複数のキューに送信する

Long Polling による別のキューのメッセージ遅延を防ぐためにワーカーは 1 つのキューのみに対してメッセージの監視をします。一方で、タスクの役割ごとに利用するキューは変えたいため、1 つのキューを監視して、複数のキューのうちのどれかに送信するということをしたいです。

残念ながら ndkale では、受信しているキューへ対してしか送信できないため、キューの設定ファイルを 2 つ用意して対応します。

ndkale では、キューの情報は QueueInfo のクラス変数に保持されています。内部実装を見る限り、受信用のキューは QueueInfo._queues から参照し、送信用のキューは QueueInfo._simple_name_queues_map から参照しているので、まず受信用のキューをローディングした後に、送信用のキューを QueueInfo._simple_name_queues_map に追加することで解決します。

from kale import settings, utils, worker
from kale.queue_info import QueueInfo

queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
# load queues for dequeue
QueueInfo(
    config_file=settings.QUEUE_CONFIG,
    sqs_talk=None,
    queue_cls=queue_class
)

# load queues for enqueue
queues = QueueInfo._get_queues_from_config(settings.ENQUEUE_CONFIG, queue_class)
for q in queues:
    if q.simple_name not in QueueInfo._simple_name_queues_map:
        QueueInfo._simple_name_queues_map[q.simple_name] = q

キューの動的な選択

タスクは kale.Task クラスを継承して定義します。送信するキューの名前は継承したタスククラスの queue というクラス変数に設定しますが、これで固定されてしまいます。

タスクの publish() で動的に送信するキューを変更したいのですが、単にクラス変数を上書きするだけでは別プロセスで実行されるワーカーでリトライ時に再送信されるキューがデフォルトのものに戻ってしまうため、うまくいきません。並行処理での問題もあります。

そこで、メッセージが用意している app_data にキューの名前を指定してそれをワーカーにも伝搬させる方針をとることにしました。キューの送信を行う Publisher を差し替えることで実現しています。Publisher の差し替えも Task の継承のオーバーライドでできるように設計されていてすごいです。

from kale import publisher, task

class TaskClass:
    def __init__(self, task_class, module=None, name=None, queue=None, time_limit=None):
        self.__module__ = module or task_class.__module__
        self.__name__ = name or task_class.__name__
        self.queue = queue or task_class.queue
        self.time_limit = time_limit or task_class.time_limit

class Publisher(publisher.Publisher):
    def publish(self, task_class, task_id, payload, current_retry_num=None, delay_sec=None):
        """
        Override original method to inject TaskClass
        """
        app_data = payload.get('app_data')
        if isinstance(app_data, dict):
            task_class = TaskClass(task_class, queue=app_data.get('queue'))
        super(Publisher, self).publish(task_class, task_id, payload, current_retry_num=current_retry_num,
                                       delay_sec=delay_sec)

task_publisher = None

class BaseTask(task.Task):
    @staticmethod
    def _get_publisher():
        """
        Override original class method to inject custom Publisher
        """
        global task_publisher
        if task_publisher is None:
            task_publisher = Publisher()
        return task_publisher


class SomeTask(BaseTask):
    pass

# dynamic queue selection for enqueue
SomeTask.publish({'queue': 'not_default_queue'}, msg)

再実行時刻を指定するリトライエラー

タスク実行時にエラーが発生した場合は Task._get_delay_sec_for_retry(current_retry_num) で計算される秒数待って再実行するようにして、exponential backoff に対応しています。

しかし、エラーに設定した時刻に再実行されるほうが扱いやすいので、retry_at を設定したエラーを送出することでその時刻に再実行するように DelaySeconds を設定して欲しいです。

残念ながら、ndkale はエラーの種類による動的なリトライ時間の計算は想定していなかったようで、ndkale 本体に手を入れなければこれは実現できません。

# kale.task.Task
    @classmethod
    def _get_delay_sec_for_retry_with_message(cls, message, raised_exception):
        return cls._get_delay_sec_for_retry_with_message(message.task_retry_num)

    def handle_failure(cls, message, raised_exception):
        # delay_sec = cls._get_delay_sec_for_retry(message.task_retry_num)
        delay_sec = cls._get_delay_sec_for_retry_with_message(message, raised_exception)

https://github.com/aces-inc/ndkale/commit/4787a8a0218fe575621bccc7752635b291e61c39#diff-9230566b060f8c0681ff4527bc9e483aR174

ndkale をこのように修正した上で以下のように Task を拡張して _get_delay_sec_for_retry_with_message() 関数をオーバーライドすればリトライ時刻の指定が可能になります。

class RetryError(Exception):
    def __init__(self, msg, retry_at=None):
        """
        retry_at must be set tzinfo
        """
        self.msg = msg
        self.retry_at = retry_at

class BaseTask(task.Task):
    @classmethod
    def _get_delay_sec_for_retry_with_message(cls, message, raised_exception):
        if isinstance(raised_exception, RetryError) and raised_exception.retry_at is not None:
            now = timezone.now()
            if raised_exception.retry_at < now:
                # retry immediately
                return 0
            diff = raised_exception.retry_at - now
            if diff.seconds > 900:
                # SQS max delay_time is 15 minutes (900 seconds)
                return 900
            else:
                seconds = diff.seconds
                if diff.microseconds > 0:
                    seconds += 1
                return seconds

        return super(BaseTask, cls)._get_delay_sec_for_retry_with_message(message, raised_exception)

retry_at0 の場合は即時再実行、None の場合は従来の exponential backoff でリトライします。また、SQS の DelaySeconds の最大値は 900 秒であるため、それより長い場合には 900 秒に切り詰めます。また秒以下の値は切り上げになります。

Django で使う

Django と組み合わせて使うためには、Django のコンテキストをうまく初期化して使う必要があります。Django の初期化をしつつ Worker を単独で起動する方法として、manage.py でコマンドとして起動する方法を採用します。

また、タスク内で Django のデータベースへのアクセスをする場合は、コネクションプールにあるデータベースへのコネクションを気にする必要があります。コネクションは、データベースに設定されたアイドル時間によるタイムアウトや大きすぎるサイズのクエリなどによって強制的に切断されてしまう場合があります。

これらの切断されたコネクションをそのまま使うと以下のようなエラーが発生します。

(2006, 'MySQL server has gone away')

これを解決するためには切断されたコネクションをコネクションプールから排除する必要がありますが、Django は自動でコネクションプールの死活管理をしてくれません。そのため、タスクの実行後に close_old_connections() を呼び出してコネクションプール内の切断されたコネクションを排除します。

Django のコマンドとして呼び出すために、<package_name>/management/commands/worker.py に以下のようなコマンドを定義します。

from __future__ import absolute_import, print_function

import logging

from django.core.management.base import BaseCommand
from django.db import close_old_connections
from kale import settings, utils, worker
from kale.queue_info import QueueInfo

logger = logging.getLogger(__name__)

class DjangoWorker(worker.Worker):
    def _on_task_succeeded(self, message, time_remaining_sec):
        super()._on_task_succeeded(message, time_remaining_sec)
        # cleanup db connection pool for each task
        close_old_connections()

    def _on_task_failed(self, message, time_remaining_sec, err, permanent_failure):
        super()._on_task_failed(message, time_remaining_sec, err, permanent_failure)
        # cleanup db connection pool for each task
        close_old_connections()

class Command(BaseCommand):
    help = 'worker'

    def handle(self, *args, **options):
        queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
        # load queues for dequeue
        QueueInfo(
            config_file=settings.QUEUE_CONFIG,
            sqs_talk=None,
            queue_cls=queue_class
        )

        # load queues for enqueue
        queues = QueueInfo._get_queues_from_config(settings.ENQUEUE_CONFIG, queue_class)
        for q in queues:
            if q.simple_name not in QueueInfo._simple_name_queues_map:
                QueueInfo._simple_name_queues_map[q.simple_name] = q

        logger.info('Task worker is running ...')
        DjangoWorker().run()

python manage.py worker と実行すればワーカーが起動されます。

テストで即時実行したい

非同期処理をテストすることはいつそのタスクが実行されるかを制御する必要があって難しいです。そのため、テストの時だけ非同期処理を同期的に実行したくなります。

残念ながら ndkale にはテスト時にだけ同期的に処理する仕組みはないですが、以下のように publish_now() クラスメソッドを Task に追加することで実現できます。

class BaseTask(task.Task):
    @classmethod
    def publish_now(cls, app_data, *args, **kwargs):
        tsk = cls({})
        tsk.run(*args, **kwargs)

テスト時には、publish() クラスメソッドを publish_now() に差し替えます。

class SampleTask(BaseTask):
    pass

from unittest import mock, TestCase

class SampleTests(TestCase):
    @mock.patch('SampleTask.publish', SampleTask.publish_now)
    def test_sample(self):
        SampleTask.publish()

バグを直す

ndkale の内部実装を眺めているうちにいくつかのバグに気づいたので本家のリポジトリにプルリクエストを送りました。いずれもいいレスポンスをもらえたのでそのうちマージされると思います。

github.com

メッセージの削除の順序が間違っていてデータロストする可能性があった課題を解決したプルリクエストです。

github.com

タスクの即時実行ができなくなってしまっている課題を解決したプルリクエストです。後方互換性のためにもう少し工夫する必要があるみたいですが、そのうち直します。

最後に

さて、かなり長くなってしまいましたが ndkale を紹介しました。

とても品質の高いプロダクトなので内部実装を読んで勉強してみるのもいいと思います。あと、Celery を SQS で使うのはやめることをおすすめします。

golang.tokyo #28 で登壇して賞品をいただきました

限りある時間を大切に。どうも、かわしんです。情報洪水の現世をスマートに生きていきましょう。

さて、去る 12 月 4 日に開催された 「golang.tokyo #28 ~年末だよ!Go大忘年会&LT大会!~」 で LT をしてきました。

golangtokyo.connpass.com

発表資料はこれです。

半年前に作った Twilter の話で、なぜ作ったのかと工夫したフィルターの仕組みについて解説しました。

内容としては以前の記事の内容を濃縮した感じです。

kawasin73.hatenablog.com

この LT 大会では最後に参加者全員で投票してよかった LT を決める時間があり、その中で第4位に選んでいただきました。

1位から賞品を選んでいったのですが、僕が一番欲しかった「Go 言語による並行処理」のオライリー本をいただくことができました。ありがとうございます。

f:id:kawasin73:20191214161253j:plain

最後に発表中の反応ツイートを引用して筆をおきたいと思います。

Go でトランザクションをフルスクラッチで実装した

一歩ずつ一歩ずつ前へ進んでいく、確実に。どうも、かわしんです。

到底 1 記事に収まるような内容ではなく長いので、トランザクションの作り方に興味のない方は途中の「なぜ Go なのか」まで読んでいただければ嬉しいです。

この記事は、Go2 Advent Calendar 2019 の 7 日目と セキュリティキャンプ 修了生進捗 #seccamp OB/OG Advent Calendar 2019 の 7 日目を兼用しています。

さて、僕の興味は必要になったライブラリやミドルウェアなどを自作して、作りたいプロダクトを完成させることです。必要なコンポーネントがないからといってプロダクトを作るのを諦めたり妥協したりはしたくありません。

多くのアプリケーションではデータベースは重要なコンポーネントです。大抵のアプリケーションは MySQL や Postgres、Redis など既存のデータベースシステムで対応することができますが、既存のデータベースシステムでは対応できないような場合に備えてデータベースシステムを作れる力をつけておきたいと思っていました。

そこで、今回はデータベースシステムを作る前哨戦として トランザクション を実装します。出来上がったものはこちらです。

github.com

僕は、今年の セキュリティ・ネクストキャンプ に参加したのですが、同時開催されていたセキュリティキャンプのデータベースゼミの講師である 星野さん にお願いして講座で利用された教科書をいただき、キャンプが終わってからその教科書を参考に設計と実装を行いました。

どんなトランザクションを作るのか

トランザクションとは ACID を満たして複数のデータの書き込みや読み込みをできるようにする仕組みだと僕は認識しています。

ACID は、A (Atomicity : 不可分性)、C (Consistency : 一貫性) 、I (Isolation : 独立性)、D (Durability : 永続性) ですね。

ここでは詳しくは説明しないので、以下の資料を読むといいかと思います。

qiita.com

マジモンのトランザクションをガチ実装するのは初心者の僕には大変なので、実装しやすいように色々制約をつけたトランザクションを作ります。

  • KVS (Key Value Store) にする
    • シンプルな操作 INSERT UPDATE READ DELETE COMMIT ABORT の操作だけに対応します。SQL などはありません。
  • 範囲検索はしない
    • インデックスの実装をサボるために範囲検索はスコープ外としています。(あと、S2PL でも助けられます)
  • インメモリデータベースとする
    • 全てのデータはメモリに載る前提です。それより大きなデータには対応しません。
    • Buffer Pool などをサボることができます。(あと、undo ログをサポートしなくてもよくなります)
  • non-deterministic DBMS を想定します
    • deterministic DBMS とはあらかじめトランザクションの処理内容がわかっているものです。例えばストアドプロシージャとかでしょうか。
    • 逆に non-deterministic ではトランザクション内で次にどの処理命令がくるかがわかりません。いつ COMMIT されるのか ABORT されるのかもわかりません。普通はこちらが使われてると思います。
    • non-deterministic DBMS ではトランザクションは ACID の I を担保するために頑張る必要があります。
  • WAL を使う
    • Write Ahead Log の略です。データベースに対する何らかの更新(Write)を行う前にその操作の内容をログとして追記し、更新が反映する途中でプロセスが死んだ場合でもその更新を再開したりロールバックできるようにして、ACID の A と D を実現するための仕組みです。
    • ログは COMMIT ログが永続化された変更は永続化されたとして確定し、プロセスが死んで再開された後に復旧されます(Crash Recovery)。
    • COMMIT ログが永続化される前にプロセスが死んだ場合は WAL ログに書かれた更新の内容は失敗したとして破棄されたり書き戻されたりします。
    • つまり、WAL に COMMIT ログが書かれて初めてクライアントにトランザクションの更新が成功したと返信することができます。
    • 参考 : バッファ管理とWAL - Qiita
    • ログには undo ログと redo ログの 2 種類がありますが、今回は redo ログのみを使います。
  • チェックポイントの作成は起動時と終了時のみ
    • トランザクションが動いているときにチェックポイントを書き出すのはややこしいので今回はサボります。
    • プロセスの終了が遅くなってしまいますが許容します。

このような制約がありつつも、マルチスレッドで ACID を守って正しく動くトランザクションを作ります。

どうやってトランザクションを作るのか

何を作る時でも同じですが、「まず遅くてもいいから正しく動くものを愚直に作り、それに最適化を施すことによって効率的に速く動くようにしていく」 という作り方が鉄則です。

今回は以下の3ステップに分けて実装していきます。

Step 1 は、Trivial Scheduler と呼ぶそうです。同時に1つのトランザクションしか動かないので排他制御も必要ありません。ただ、愚直に実装するのみです。ただし、今後はこの実装に積み重ねていくのでインターフェイスなどはちゃんと考えて、正しく振る舞うように実装します。

Step 2 では、複数のトランザクションが同時に動くので 並行制御 をする必要があります。並行制御アルゴリズムには様々なものがありますが、初心者向けとして S2PL (Strict Two Phase Lock) が教科書で勧められていたのでこれを採用します。

Step 3 では、Step 2 の実装をマルチスレッドで動くようにします。インデックスやデータストアや WAL などは同時にアクセスによるデータ競合を防ぐために 排他制御 をする必要が出てきます。

なぜ Go なのか

今回は Go で実装していくわけですが、その一番大きな理由は goroutine の存在です。

Step 2 の並行制御をするためには並行スケジューラが必要になります。しかし、goroutine は I/O と自動で連携したコルーチンであり、Go のランタイムに並行スケジューラが実装されているため、Go を使うことでこのトランザクションの中で一番大変と思われる 並行スケジューラの実装をサボる ことができるのです。

むしろ、並行スケジューラを実装しないでトランザクションの実装を勉強したと言えるのかどうか怪しいところではありますが、それは C や C++ で実装するときにやるので勘弁して欲しいです。今回はトランザクションの概観をざっくり知るために作っているので。

また、Step 3 でマルチスレッド化するわけですが、goroutine ベースで作るとランタイムが 自動でスレッド間で適切に分散してスケジューリング してくれます。正直あっけないくらい簡単にマルチスレッド対応できてしまいました。

また、Go はミドルウェアなどを プロトタイピングするのに向いている言語 だと思っています。

goroutine もそうですが、map やスライスなど必要なコンポーネントが言語自体や標準ライブラリに含まれていますし、GC がついているので雑にメモリを確保することができます。また、バイト列が []byte として柔軟に扱える上に、多くのインターフェイス[]byte を利用することが想定されている (io.Reader など) ため、ファイル操作やバイナリ処理をするようなミドルウェアの開発に向いています。さらに、そこそこ速いため速さを求められないようなプロダクトではそのままプロダクションで使えてしまいます。

トランザクションを作っていく!

ここでは順を追ってどうやって実装していったかを紹介していきます。全て main.go の1ファイルに収まる 1000 行に満たない小さな実装なので、ぜひコードを読んでみてください。それぞれのステップでブランチがきられています。

Step 1 : シングルスレッドで同時に1トランザクションのみが動く

GitHub - kawasin73/txngo at trivial_scheduler

まず Go で シングルスレッド を実現しなくてはなりません。Go は M:N 方式 (1, 2) で動くため厳密にシングルスレッドでは動きません。しかし、goroutine を実行する主体である processor を 1 つだけに限定すればシングルスレッドと同じように 1 つの goroutine のみが同時に動き、データ競合も発生しません。

// execute on single thread
runtime.GOMAXPROCS(1)

トランザクションは同時に1つしか動かないため入出力インターフェイスも1つだけで十分です。そこで、標準入出力 を使ってターミナルから柔軟にクエリを流し込めるようにしました。

さて、トランザクションは以下のような構成で実現しています。

f:id:kawasin73:20191206180322p:plain

起動時に CheckPoint ファイルから永続化されている 全てのデータが DB に読み込まれ ます。DB は map[string]Record で表されたハッシュマップで、キーからレコードを高速に引いてくることができ、インデックスの役割も兼ねています。

トランザクションの更新した変更は途中で ABORT されて変更が全て破棄される可能性があります。もし COMMIT される前に更新内容を DB に反映していた場合はその内容を元に戻す必要があります。さらに、今後複数トランザクションを同時に動かす時には、MVCC などを導入しないと確定していないデータを他のトランザクションに読まれてしまうダーティリードになってしまいます。

この課題を解決するために トランザクションの更新した変更は Write Set に保存して、COMMIT された後に DB に書き戻す ようにします。

更新処理の中で以下のように key を処理しているのを不思議に思うかもしれません。これは引数で渡された文字列 key が別の文字列の一部であった時、key が DB に保存されることで元の文字列が GC されなくなってしまうことを防ぐためです。明示的に key 文字列のメモリを確保し直して、外部のメモリへの参照を発生させないようにしています。

// reallocate string
key = string(key)

READ ではトランザクション内での変更結果を読む必要があるため、まず Write Set を検査して、なかった場合に初めて DB を読みにいくようにします。READ での検索を高速にするために Write Set は map[string]RecordCache で表されたハッシュマップで実現します。レコードが削除された場合でも DB にはレコードはあるので Write Set に削除されたフラグを持った RecordCache を保持します。

COMMIT 時にトランザクションでの 更新履歴を全てログとして一度に WAL に書き出し ます。本当は更新時に逐一 WAL に書き出して COMMIT 時には COMMIT ログだけを永続化することが望ましいのですが、次のステップで複数のトランザクションから同時に WAL に更新ログを書き込むとログの順序が割り込まれてしまうため、一度に書き出すようにしました。(トランザクション ID を使えば解決しますが、ID の払い出しを考えたくなかったのでこの方法にしています。また、ABORT される内容を WAL に書かないのでちょっと効率的かなとか思ってます)

また、Write Set のハッシュマップでは更新した順番がわからないため、ログの履歴を保持するスライス Logs ([]RecordLog) を用意し、その内容を WAL に書き出します。

ABORT するときは、Write Set の中身と Logs をクリアするだけでトランザクションの変更内容は全て破棄されます。

プロセスの終了時に DB に保存された内容を全て CheckPoint ファイルに書き出します。しかし、ファイルの上書き中にプロセスが突然死すると元々の CheckPoint ファイルの内容が壊れてしまいます。そこで一時ファイルに内容を書き出した後に os.Rename() してアトミックにファイルを差し替えて CheckPoint ファイルを更新します。上書きされたファイルの内容は自動で削除されます。

CheckPoint ファイルへの書き出しが永続化されたら WAL ファイルの内容は全て必要なくなります。(WAL の中の全ての更新履歴が CheckPoint ファイルに反映されているため)そのため、WAL ファイルを os.Truncate() してクリアします。

起動時に WAL ファイルの中身があるときは前回のプロセスが異常終了したことを表します。(正常終了では必ず WAL ファイルはクリアされているため)そのときは Crash Recovery を行います。WAL ファイルの更新履歴を COMMIT ログがあるところまで再実行して CheckPoint ファイルの内容を更新します。


やっていることはこれだけです。これだけの処理をするだけで ACID を守ったトランザクションが出来上がってしまいます。

Step 2 : シングルスレッドで同時に複数のトランザクションが並行に動く

GitHub - kawasin73/txngo at S2PL-single-thread

まず、複数のトランザクションに同時にクエリを送信するために、インターフェイスは標準入出力の他に TCP コネクション をサポートするようにしました。telnet で接続すると標準入出力と同じやり取りを複数同時に行えるようになります。TCP コネクションごとにトランザクション実行主体が作成されます。

複数のトランザクションを並行に動かすために並行制御アルゴリズムである S2PL を導入します。トランザクションはそれぞれ goroutine のなかで並行に動いています。

S2PL とは Two Phase Lock を発展させたもので、あるレコードへの排他ロック(Write Lock)または共有ロック(Read Lock)の獲得を成長相のみで行い、トランザクションの完了(WAL への COMMIT ログの書き込み)後に排他ロックを解放するというものです。共有ロックはトランザクションの完了前に解放することが許されますが、排他ロック・共有ロックのどちらのロックも解放したあとは縮退相となりロックの獲得はできなくなります。

この S2PL を導入するためにそれぞれの操作に以下の処理を追加します。

  • READ するときは Read Lock を獲得
  • INSERT, UPDATE, DELETE をするときは Write Lock を獲得
  • COMMIT するときは以下の順番で処理
    1. 獲得した全ての Read Lock を解放
    2. WAL の書き込みと永続化
    3. Write Set の中身を DB に反映
    4. 獲得した全ての Write Lock を解放
    5. Write Set と Logs をクリア
  • ABORT するときは獲得した全ての Lock を解放

ここで1つ課題が発生します。READ したレコードを更新する場合 の処理は自明ではなく、明確に決める必要があります。

すでに Write Lock を獲得した後は READ をする場合でも更新処理をする場合でも新たにロックを取ることはありません。しかし、Read Lock を獲得した後に更新する場合はそれを Write Lock に変換する必要があります。

Go には、sync.RWMutex という Read Lock と Write Lock が可能な Mutex コンポーネントがありますが、残念ながら Read Lock を Write Lock に変換する方法は提供していません。もし、Read Lock を解放して Write Lock を獲得し直すと別の goroutine で Write Lock 待ちしていたトランザクションにロックを横取りされてしまう可能性があります。

ここで僕が考えた解決策は以下の2つです。

前者は、標準ライブラリの sync.RWMutex だけで実現できるため簡単に実装できます。しかし、ユーザはトランザクション開始時に明示的に Read-Write か Read-Only であるかを宣言する必要があります。正直これは使い勝手が悪いです。そのくらい自動で判別してほしい。

処理パフォーマンスの高速化やデッドロックの回避を理由にしてこの手法を採用するのであればいいのですが、コンポーネント自作が難しいからという理由でユーザに不自由を押し付けることはしたくありません

そこで後者の昇格可能な RWMutex を自作する手法を選択して github.com/kawasin73/umutex を自作しました。詳しくは以下の記事で紹介しています。

kawasin73.hatenablog.com

昇格可能な RWMutex では、Read Lock を獲得した goroutine 同士が昇格しようとすると デッドロック を起こしてしまうという課題がありますが、この umutex ではデッドロックを検知する仕組みが備わっています。

f:id:kawasin73:20191206203651p:plain

ロックはレコードごとに獲得できるようにするために Locker というコンポーネントを追加しました。S2PL では範囲スキャンをした時にロックの穴あきによる予期せぬアクセスを防ぐため、範囲ロックを工夫しないといけないのですが、今回のトランザクションは範囲スキャンをサポートしていないのでシンプルな map[string]*lock で実現しています。レコードごとに複数の Read Lock が起こり得るため参照カウントを持ち不必要なロックオブジェクトを削除しています。

トランザクションでは Write Set にあるレコードは全て Write Lock を獲得していることを表します。同様に Read Lock を獲得していることを表現するために Read Set も追加しました。

Step 1 では、更新内容は Logs と Write Set の両方に保存していましたが冗長であることに気づいたので、Write Set には Logs のインデックス番号だけを保存するように最適化しました。

COMMIT 処理でのログの書き込み中に他のトランザクションの COMMIT 処理が割り込まれるとデータが壊れてしまいます。goroutine は厳密なシングルスレッドではないため、goroutine が COMMIT 処理中にコンテキストスイッチを起こして別の goroutine の COMMIT 処理が割り込まれる可能性があります。3 そのため、COMMIT 処理全体を sync.Mutex によって排他制御するようにしました。


S2PL を導入するために昇格可能な RWMutex を実装するだけで、並行制御されたトランザクションが実装できてしまいました。goroutine の強力さのおかげです。

Step 3 : マルチスレッドで同時に複数のトランザクションが並行に動く

GitHub - kawasin73/txngo at S2PL-multi-thread

最終ステップとしてマルチスレッドで動かすために、runtime.GOMAXPROCS(1) を消します。

あとは、複数のスレッドから同時アクセスされそうな所に排他制御を導入するだけです。Locker と DB へのアクセスをするときに map の更新で競合する場合があるので sync.Mutexsync.RWMutex で保護するようにしました。


まさかのマルチスレッド対応が一瞬で終わってしまいました。goroutine にのっかるだけで同じロジックでマルチスレッドで動くというのはすごいことです。

最後に

無事、マルチスレッドで動く S2PL で並行制御されたトランザクションを完成させることができました。こんなにあっさりできてしまったのは goroutine のおかげです。

ただ、最初に述べたように色々制約をつけてサボっているところが多いです。インデックスにデータ構造を適用したりとか、バッファプールを追加したりとか、チェックポイントの作成を定期的に行ったりとか、WAL をロックを取らずに書き込んだりとか色々改善の余地はあります。

しかし、Go はメモリが枯渇することを想定しない 4 言語である上に、メモリ管理が GC と密に結合しているのでメモリ枯渇への対応であるバッファプールを実装するのはちょっと辛いです。([]byte から特定の型へ unsafe.Pointer などを使ってキャストしても GC の対象外となる)

次は、そういう改善点と並行スケジューラの実装を含めて柔軟な言語である C や C++ で実装してみたいと思っています。

最後になりますが、今回の試みはセキュリティキャンプで教科書を提供していただき、途中でレビューをしてくださった 星野さん のご協力なしには実現できないものでした。改めて感謝申し上げまして、この記事を終わりたいと思います。

Go で昇格可能な RWMutex を実装した

順番は守りましょう。どうも、かわしんです。トランザクションを実装中です。

さて、先日トランザクションの並行制御アルゴリズムである「S2PL (Strict Two Phase Lock)」を実装した 1 のですが、Read オペレーションでは Read Lock を取った後にすぐに解放していました。2 しかし、よくよく考えると 2PL はロックを解放した後にロックを取得することを禁じているため(だからこそ 2 phase なのですが)、実装が間違っていました。

つまり、Read Lock して Read した値はロックを保持し続け、Write するときにロックを Write Lock に昇格させる必要がありました。

この振る舞いを実現するためには、昇格に対応した Read Write Lock の仕組みが必要になります。

Go には sync.RWMutex という goroutine に対応した便利な Read Write Lock の仕組みが用意されていますが、昇格には対応していません。Github を探しても使えそうなライブラリはなかったので自作しました。

github.com

今回はこのライブラリの紹介です。

欲しかったもの

欲しかった機能は以下の通りです。

既存のライブラリ

一応 Go におけるロックの実装を調べたので、既存のライブラリがなぜ使えなかったのかを列挙していきます。

世の中に必要なコンポーネントがないからと、プロダクトを作るのを諦めたり、仕様を捻じ曲げたりするのは嫌いなので、自作しました。

実装

ロックの仕組みと実装方法

Go でのロックの仕組みはこの動画がわかりやすいです。

www.youtube.com

Mutex は以下の 2 段階に分けてロックが取られています。

  1. アトミックな操作による spin lock
  2. goroutine を停止させてキューに詰めて待つ

sync.Mutex で後者の goroutine 同士の待ち合わせの処理は runtimeAPI である runtime_SemacquireMutex() でおこなっているようですが、これは標準ライブラリである sync にだけ公開されているため、実質 goroutine 同士のプリミティブな待ち合わせ処理を僕たち一般の開発者が利用することはできません。

そこで、sync.Mutexsync.RWMutex といった標準ライブラリの Mutex コンポーネントを利用して実装することにしました。

注意すること

ここで昇格可能な Read Write Lock の振る舞いとして注意しておくべきことを紹介します。

1 つ目は、Lock() は Upgrade() を妨げない ということです。Upgrade()sync.RWMutex で愚直に実装しようとすると RUnlock() した後に Lock() をすることになると思います。

func (m *UMutex) Upgrade() {
    m.rwmu.RUnlock()
    m.rwmu.Lock()
}

しかしこの時、別の goroutine が Lock() をして既に取得されている Read Lock が解除されるのを待っているとすると、Upgrade()RUnlock() でロックが横取りされてしまいます。ロックを横取りされないように実装する必要があります。順番は守りましょう

2 つ目は、複数が同時に Upgrade() するとデッドロックしてしまう ということです。Upgrade() は自身以外の全てが Read Lock を手放すまで待ち合わせます。しかし、複数が Upgrade() するということはお互いに Read Lock を手放さないため Dead Lock してしまいます。

品質の低いプロダクトであるなら Dead Lock するようなクエリを発行するユーザの責任だとして、goroutine がリークしようがお構いなしなのかもしれませんが、使い勝手は非常に悪くなります。そのため、デッドロックする場合はタイムアウトや Dead Lock の検出をして自動で失敗させるなどの Developer Friendly な振る舞いをすることが望まれます。

実際のコード

実装はそんなに難しくなくて 51 行とシンプルです。 コミットログをたどるとわかりますが、ちょうど 1 時間で実装していました。(実装開始時の設計では順番抜かしがあることが発覚して途中で設計をやり直しているので時間がかかりました)

前述の注意する点ですが、

  • Lock の横取り問題は UMutex.u の個数を確認して Upgrade 中であればロックを解放してリトライするようにして解決しました。
  • デッドロック問題は、UMutex.u をアトミックに操作して同時に Upgrade している個数を管理して、既に Upgrade 待ちをしている場合はデッドロックを検出して Upgrade を失敗させることで解決しました。

短いので実際のコードも貼り付けておきます。参考までに。

package umutex

import (
    "sync"
    "sync/atomic"
)

// UMutex is simple implementation of Upgradable RWMutex.
type UMutex struct {
    rwmu sync.RWMutex
    u    int32
}

// RLock locks shared for multi reader.
func (m *UMutex) RLock() {
    m.rwmu.RLock()
}

// RUnlock unlocks reader lock.
func (m *UMutex) RUnlock() {
    m.rwmu.RUnlock()
}

// Lock locks exclusively for single writer.
func (m *UMutex) Lock() {
lock:
    m.rwmu.Lock()
    if atomic.LoadInt32(&m.u) > 0 {
        // Upgrade is given priority to Lock, retry lock.
        m.rwmu.Unlock()
        goto lock
    }
}

// Unlock unlocks writer lock.
func (m *UMutex) Unlock() {
    m.rwmu.Unlock()
}

// Upgrade converts reader lock to writer lock and returns success (true) or dead-lock (false).
// If Upgrade by multi reader locker at same time then dead-lock.
// Upgrade is given priority to Lock.
func (m *UMutex) Upgrade() bool {
    success := atomic.AddInt32(&m.u, 1) == 1
    if success {
        m.rwmu.RUnlock()
        m.rwmu.Lock()
    }
    atomic.AddInt32(&m.u, -1)
    return success
}

まとめ

これで昇格可能な Read Write Mutex の Go の実装ができました。次回はとうとうトランザクションを実装します。多分 Go 2 のアドベントカレンダー になると思いますが。

Go : Mutex は channel の 4 倍速い

速ければ速いほど良い。どうもかわしんです。トランザクションを実装中です。

トランザクションの並行処理で S2PL (Strict Two Phase Lock) を Go で実装しようとしているのですが、どうしても昇格可能な Reader Writer Mutex が必要になり、Github にいい実装がなかったので自分で実装しようとしています。

さて、独自の Mutex を実装するにあたり goroutine 同士の待ち合わせは何かを使って実現する必要がありますが、Go には sync.Mutex とチャネルがあります。

どちらとも、ロックしてアンロックするということができます。振る舞いは同じです。

振る舞いが同じとなればどちらが速いかが重要となります。

ということで実験してみました。

実験

環境は以下の通りです。

ベンチマークコード

以下を適当に main_test.go としておきます。

package main

import (
    "sync"
    "testing"
)

func BenchmarkMutex(b *testing.B) {
    var mu sync.Mutex
    var n int
    for i := 0; i < b.N; i++ {
        mu.Lock()
        n++
        mu.Unlock()
    }
}

func BenchmarkChan(b *testing.B) {
    ch := make(chan struct{}, 1)
    var n int
    for i := 0; i < b.N; i++ {
        ch <- struct{}{}
        n++
        <-ch
    }
}

func BenchmarkMultiMutex(b *testing.B) {
    var mu sync.Mutex
    var n int
    var wg sync.WaitGroup
    wg.Add(10)
    for j := 0; j < 10; j++ {
        go func() {
            for i := 0; i < b.N; i++ {
                mu.Lock()
                n++
                mu.Unlock()
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

func BenchmarkMultiChan(b *testing.B) {
    ch := make(chan struct{}, 1)
    var n int
    var wg sync.WaitGroup
    wg.Add(10)
    for j := 0; j < 10; j++ {
        go func() {
            for i := 0; i < b.N; i++ {
                ch <- struct{}{}
                n++
                <-ch
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

BenchmarkMulti****() は、10 の goroutine で同時に動かすものです。

結果

GOMAXPROCS=1 でおこなったものがこれです。

$ GOMAXPROCS=1 GO111MODULE=off go test -bench .
goos: darwin
goarch: amd64
BenchmarkMutex          100000000               10.4 ns/op
BenchmarkChan           28564992                40.5 ns/op
BenchmarkMultiMutex      9439315               126 ns/op
BenchmarkMultiChan       1000000              1189 ns/op
PASS
ok      _/Users/kawasin73/work/sample-go/rwlock 4.808s

普通のマルチスレッドでおこなったものがこれです。

$ GO111MODULE=off go test -bench .
goos: darwin
goarch: amd64
BenchmarkMutex-8                100000000               10.7 ns/op
BenchmarkChan-8                 27600146                42.3 ns/op
BenchmarkMultiMutex-8            2536651               494 ns/op
BenchmarkMultiChan-8              736453              1710 ns/op
PASS
ok      _/Users/kawasin73/work/sample-go/rwlock 5.307s

まとめ

sync.Mutex の方がチャネルよりだいたい 4 倍速いです。チャネルは内部でロックを取った上でごにょごにょやっているので遅いのは当たり前です。

また、シングルスレッドにするとスレッド同士の待ち合わせが無くなったりシングルスレッド用の最適化があるのかどうかはわかりませんが、速くなります。速くなるというよりは、マルチスレッドで動かすと遅くなるという表現の方が理解しやすそうですが。

シンプルな昇格可能な RWMutex を作る分には sync.Mutex を使う方向でできそうです。

ただ、タイムアウトをさせようとすると Go では必ずチャネルを使わないといけなくなりそうなのが残念です。