kawasin73のブログ

技術記事とかいろんなことをかくブログです

Go で昇格可能な RWMutex を実装した

順番は守りましょう。どうも、かわしんです。トランザクションを実装中です。

さて、先日トランザクションの並行制御アルゴリズムである「S2PL (Strict Two Phase Lock)」を実装した 1 のですが、Read オペレーションでは Read Lock を取った後にすぐに解放していました。2 しかし、よくよく考えると 2PL はロックを解放した後にロックを取得することを禁じているため(だからこそ 2 phase なのですが)、実装が間違っていました。

つまり、Read Lock して Read した値はロックを保持し続け、Write するときにロックを Write Lock に昇格させる必要がありました。

この振る舞いを実現するためには、昇格に対応した Read Write Lock の仕組みが必要になります。

Go には sync.RWMutex という goroutine に対応した便利な Read Write Lock の仕組みが用意されていますが、昇格には対応していません。Github を探しても使えそうなライブラリはなかったので自作しました。

github.com

今回はこのライブラリの紹介です。

欲しかったもの

欲しかった機能は以下の通りです。

既存のライブラリ

一応 Go におけるロックの実装を調べたので、既存のライブラリがなぜ使えなかったのかを列挙していきます。

世の中に必要なコンポーネントがないからと、プロダクトを作るのを諦めたり、仕様を捻じ曲げたりするのは嫌いなので、自作しました。

実装

ロックの仕組みと実装方法

Go でのロックの仕組みはこの動画がわかりやすいです。

www.youtube.com

Mutex は以下の 2 段階に分けてロックが取られています。

  1. アトミックな操作による spin lock
  2. goroutine を停止させてキューに詰めて待つ

sync.Mutex で後者の goroutine 同士の待ち合わせの処理は runtimeAPI である runtime_SemacquireMutex() でおこなっているようですが、これは標準ライブラリである sync にだけ公開されているため、実質 goroutine 同士のプリミティブな待ち合わせ処理を僕たち一般の開発者が利用することはできません。

そこで、sync.Mutexsync.RWMutex といった標準ライブラリの Mutex コンポーネントを利用して実装することにしました。

注意すること

ここで昇格可能な Read Write Lock の振る舞いとして注意しておくべきことを紹介します。

1 つ目は、Lock() は Upgrade() を妨げない ということです。Upgrade()sync.RWMutex で愚直に実装しようとすると RUnlock() した後に Lock() をすることになると思います。

func (m *UMutex) Upgrade() {
    m.rwmu.RUnlock()
    m.rwmu.Lock()
}

しかしこの時、別の goroutine が Lock() をして既に取得されている Read Lock が解除されるのを待っているとすると、Upgrade()RUnlock() でロックが横取りされてしまいます。ロックを横取りされないように実装する必要があります。順番は守りましょう

2 つ目は、複数が同時に Upgrade() するとデッドロックしてしまう ということです。Upgrade() は自身以外の全てが Read Lock を手放すまで待ち合わせます。しかし、複数が Upgrade() するということはお互いに Read Lock を手放さないため Dead Lock してしまいます。

品質の低いプロダクトであるなら Dead Lock するようなクエリを発行するユーザの責任だとして、goroutine がリークしようがお構いなしなのかもしれませんが、使い勝手は非常に悪くなります。そのため、デッドロックする場合はタイムアウトや Dead Lock の検出をして自動で失敗させるなどの Developer Friendly な振る舞いをすることが望まれます。

実際のコード

実装はそんなに難しくなくて 51 行とシンプルです。 コミットログをたどるとわかりますが、ちょうど 1 時間で実装していました。(実装開始時の設計では順番抜かしがあることが発覚して途中で設計をやり直しているので時間がかかりました)

前述の注意する点ですが、

  • Lock の横取り問題は UMutex.u の個数を確認して Upgrade 中であればロックを解放してリトライするようにして解決しました。
  • デッドロック問題は、UMutex.u をアトミックに操作して同時に Upgrade している個数を管理して、既に Upgrade 待ちをしている場合はデッドロックを検出して Upgrade を失敗させることで解決しました。

短いので実際のコードも貼り付けておきます。参考までに。

package umutex

import (
    "sync"
    "sync/atomic"
)

// UMutex is simple implementation of Upgradable RWMutex.
type UMutex struct {
    rwmu sync.RWMutex
    u    int32
}

// RLock locks shared for multi reader.
func (m *UMutex) RLock() {
    m.rwmu.RLock()
}

// RUnlock unlocks reader lock.
func (m *UMutex) RUnlock() {
    m.rwmu.RUnlock()
}

// Lock locks exclusively for single writer.
func (m *UMutex) Lock() {
lock:
    m.rwmu.Lock()
    if atomic.LoadInt32(&m.u) > 0 {
        // Upgrade is given priority to Lock, retry lock.
        m.rwmu.Unlock()
        goto lock
    }
}

// Unlock unlocks writer lock.
func (m *UMutex) Unlock() {
    m.rwmu.Unlock()
}

// Upgrade converts reader lock to writer lock and returns success (true) or dead-lock (false).
// If Upgrade by multi reader locker at same time then dead-lock.
// Upgrade is given priority to Lock.
func (m *UMutex) Upgrade() bool {
    success := atomic.AddInt32(&m.u, 1) == 1
    if success {
        m.rwmu.RUnlock()
        m.rwmu.Lock()
    }
    atomic.AddInt32(&m.u, -1)
    return success
}

まとめ

これで昇格可能な Read Write Mutex の Go の実装ができました。次回はとうとうトランザクションを実装します。多分 Go 2 のアドベントカレンダー になると思いますが。