順番は守りましょう。どうも、かわしんです。トランザクションを実装中です。
さて、先日トランザクションの並行制御アルゴリズムである「S2PL (Strict Two Phase Lock)」を実装した 1 のですが、Read オペレーションでは Read Lock を取った後にすぐに解放していました。2 しかし、よくよく考えると 2PL はロックを解放した後にロックを取得することを禁じているため(だからこそ 2 phase なのですが)、実装が間違っていました。
つまり、Read Lock して Read した値はロックを保持し続け、Write するときにロックを Write Lock に昇格させる必要がありました。
この振る舞いを実現するためには、昇格に対応した Read Write Lock の仕組みが必要になります。
Go には sync.RWMutex
という goroutine に対応した便利な Read Write Lock の仕組みが用意されていますが、昇格には対応していません。Github を探しても使えそうなライブラリはなかったので自作しました。
今回はこのライブラリの紹介です。
欲しかったもの
欲しかった機能は以下の通りです。
- Read (shared) Lock と Write (exclusive) Lock の仕組み
- Read Lock から Write Lock への昇格 (Upgrade)
- Upgrade をする時はデッドロックする可能性が高いため、デッドロックを回避する仕組み
- シンプルで速い実装
既存のライブラリ
一応 Go におけるロックの実装を調べたので、既存のライブラリがなぜ使えなかったのかを列挙していきます。
- GitHub - tilinna/rumutex: A Go (golang) implementation of an upgradable RW lock.
- 昇格可能な R/W Lock を提供。
- 内部実装は
sync.RWMutex
を利用。Upgrade()
するときはアトミックにフラグを立てて競合していないことを確認してからRUnlock() -> Lock()
をしている。 Upgrade()
ではアトミックなフラグ操作ですでにUpgrade()
していることがわかったらreturn false
して関数を抜けてしまう。待たない。- ダメなところ
Lock()
がない。Write Lock をする時はRLock() -> Upgrade()
しか方法が用意されていないUpgrade()
は競合した時にすぐに関数を抜けるので、Write Lock 同士を待ち合わせることができない 。トランザクションで使えない。
- GitHub - laynelee/rwlock: read write lock that supports try lock for golang
- Try Lock が可能な R/W Lock を提供。
- 内部実装はチャネルを利用。
- ダメなところ
- チャネルによるロックは遅い 3
- 昇格はできない
- GitHub - subchen/go-trylock: TryLock support on read-write lock for Golang
- タイムアウト + Try Lock ができる R/W Lock を提供。
- 内部実装はチャネルを利用。
- ブロードキャストにチャネルの
close()
を利用している。また close したチャネルを入れ替えるためにsync.Mutex
も利用している。 - ダメなところ
- チャネルによるロックは遅い [^3]
- 昇格はできない
- GitHub - BurntSushi/locker: A simple Golang package for conveniently using named read/write locks. Useful for synchronizing access to session based storage in web applications.
- ただ key によって Mutex をルーティングをするだけ
世の中に必要なコンポーネントがないからと、プロダクトを作るのを諦めたり、仕様を捻じ曲げたりするのは嫌いなので、自作しました。
実装
ロックの仕組みと実装方法
Go でのロックの仕組みはこの動画がわかりやすいです。
Mutex は以下の 2 段階に分けてロックが取られています。
- アトミックな操作による spin lock
- goroutine を停止させてキューに詰めて待つ
sync.Mutex
で後者の goroutine 同士の待ち合わせの処理は runtime
の API である runtime_SemacquireMutex()
でおこなっているようですが、これは標準ライブラリである sync
にだけ公開されているため、実質 goroutine 同士のプリミティブな待ち合わせ処理を僕たち一般の開発者が利用することはできません。
そこで、sync.Mutex
や sync.RWMutex
といった標準ライブラリの Mutex コンポーネントを利用して実装することにしました。
注意すること
ここで昇格可能な Read Write Lock の振る舞いとして注意しておくべきことを紹介します。
1 つ目は、Lock() は Upgrade() を妨げない ということです。Upgrade()
を sync.RWMutex
で愚直に実装しようとすると RUnlock()
した後に Lock()
をすることになると思います。
func (m *UMutex) Upgrade() {
m.rwmu.RUnlock()
m.rwmu.Lock()
}
しかしこの時、別の goroutine が Lock()
をして既に取得されている Read Lock が解除されるのを待っているとすると、Upgrade()
の RUnlock()
でロックが横取りされてしまいます。ロックを横取りされないように実装する必要があります。順番は守りましょう。
2 つ目は、複数が同時に Upgrade() するとデッドロックしてしまう ということです。Upgrade()
は自身以外の全てが Read Lock を手放すまで待ち合わせます。しかし、複数が Upgrade()
するということはお互いに Read Lock を手放さないため Dead Lock してしまいます。
品質の低いプロダクトであるなら Dead Lock するようなクエリを発行するユーザの責任だとして、goroutine がリークしようがお構いなしなのかもしれませんが、使い勝手は非常に悪くなります。そのため、デッドロックする場合はタイムアウトや Dead Lock の検出をして自動で失敗させるなどの Developer Friendly な振る舞いをすることが望まれます。
実際のコード
実装はそんなに難しくなくて 51 行とシンプルです。 コミットログをたどるとわかりますが、ちょうど 1 時間で実装していました。(実装開始時の設計では順番抜かしがあることが発覚して途中で設計をやり直しているので時間がかかりました)
前述の注意する点ですが、
- Lock の横取り問題は
UMutex.u
の個数を確認して Upgrade 中であればロックを解放してリトライするようにして解決しました。 - デッドロック問題は、
UMutex.u
をアトミックに操作して同時に Upgrade している個数を管理して、既に Upgrade 待ちをしている場合はデッドロックを検出して Upgrade を失敗させることで解決しました。
短いので実際のコードも貼り付けておきます。参考までに。
package umutex import ( "sync" "sync/atomic" ) // UMutex is simple implementation of Upgradable RWMutex. type UMutex struct { rwmu sync.RWMutex u int32 } // RLock locks shared for multi reader. func (m *UMutex) RLock() { m.rwmu.RLock() } // RUnlock unlocks reader lock. func (m *UMutex) RUnlock() { m.rwmu.RUnlock() } // Lock locks exclusively for single writer. func (m *UMutex) Lock() { lock: m.rwmu.Lock() if atomic.LoadInt32(&m.u) > 0 { // Upgrade is given priority to Lock, retry lock. m.rwmu.Unlock() goto lock } } // Unlock unlocks writer lock. func (m *UMutex) Unlock() { m.rwmu.Unlock() } // Upgrade converts reader lock to writer lock and returns success (true) or dead-lock (false). // If Upgrade by multi reader locker at same time then dead-lock. // Upgrade is given priority to Lock. func (m *UMutex) Upgrade() bool { success := atomic.AddInt32(&m.u, 1) == 1 if success { m.rwmu.RUnlock() m.rwmu.Lock() } atomic.AddInt32(&m.u, -1) return success }
まとめ
これで昇格可能な Read Write Mutex の Go の実装ができました。次回はとうとうトランザクションを実装します。多分 Go 2 のアドベントカレンダー になると思いますが。