Redis を bulk queue として使う(その1)

最近 bulk 処理できるメッセージキューが必要なことがあったので Redis を使ってサックリと作ってみました。
bulk 処理というのは、例えばあるキューを100件ずつとか取得して処理することです。
こうすることで、1件ずつ処理を行うよりもスループットを上げることが可能になるケースがあります。

つかうコマンドは RPUSH、LRANGE、LTRIM の3つだけ。

駄目なケース

処理の流れはこんな感じ

  • だれかしらが enqueue する
    • API とかバッチとか管理ツールとかいろいろあるかもしれない
  • Worker が queue を監視していて、queue が入ってきたら dequeue する
    • Worker は取得できた queue のリストをもろもろ処理する
    • 処理が成功したら queue を削除する

まずは queue をぶっこんでみます。
この時、RPUSH は引数を複数受け付けることができるので bluk で enqueue 可能なので積極的に使っていったほうがよい。

> RPUSH queue 1 2 3 4 5 6 7 8 9 10
(integer) 10

今回は3件ずつ処理することにします

> LRANGE queue 0 2
1) "1"
2) "2"
3) "3"

index は 0 から始まるので 0〜2 = 3件取得できます。
取得できたら Worker がその queue を適切に処理します。

最後に処理した queue を削除します。

> LTRIM queue 3 -1

LTRIM は指定した範囲を残すので、index 3〜最後の要素としていすることで、先程取得した queue が消えます。

> LRANGE queue 0 -1
1) "4"
2) "5"
3) "6"
4) "7"
5) "8"
6) "9"
7) "10"

とすれば確認できますね。

さて、この処理を行う Worker を疑似コードで書くと以下のような感じ。

key       = "queue"
bulk_size = 3

while (1) {
    list      = redis.lrange(key, 0, bulk_size) // dequeue する
    list_size = list.length()                   // bulk_size 未満の可能性もあるので、list の size を取得しておく

    // ここでえられた queue のリストを処理する

    redis.ltrim(key, list_size, -1) // 処理したリストを削除して終了
}

何が駄目なのか

上記の方法では、dequeue した queue を削除するまでに幾つかの処理が挟まっているので、他のプロセスやスレッドなどがある場合に、同じ queue を掴んでしまう可能性が高いです。
queue を取得したら、即座に対象の queue を削除することが求められます。

以下のように変更しましょう

key       = "queue"
bulk_size = 3

while (1) {
    list      = redis.lrange(key, 0, bulk_size) // dequeue する
    redis.ltrim(key, list_size, -1)             // 処理したリストを削除して終了
    list_size = list.length()                   // bulk_size 未満の可能性もあるので、list の size を取得しておく

    // ここでえられた queue のリストを処理する
}

これで問題ないでしょうか?
これでも全然だめですね。
というのも、上記のコードの lrange と ltrim は別々に発行されているので、間に別のプロセスやスレッドが挟まるケースが依然存在します。

MULTI - EXEC を利用する

さて、というわけで皆さんおなじみの MULTI - EXEC でくくることで、一連の処理を1つのトランザクションとして実行することが可能になります。

key       = "queue"
bulk_size = 3

while (1) {
    redis.multi()
        redis.lrange(key, 0, bulk_size) // dequeue する
        redis.ltrim(key, list_size, -1) // 処理したリストを削除して終了
    ret       = redis.exec()
    list      = ret[0]        // lrangei() の結結果を取得
    list_size = list.length() // bulk_size 未満の可能性もあるので、list の size を取得しておく

    // ここでえられた queue のリストを処理する
}

これでひとまずは大丈夫そうです!やったね!!

Luaスクリプトを利用する

さて、トランザクションを組むことが出来たのでマルチプロセスやマルチスレッドで同じ queue をつかむ心配がなくなりました。
しかし、MUIT - EXEC は実は全てのコマンドのパケットを redis server に逐一送ってしまうのです!!*1

そこで Lua スクリプトですよ!!

Redis には Lua が組み込まれていて、使うことができるんですね。
しかもその Lua スクリプト内で行う処理はすべて同一トランザクションで行われるので MULTI - EXEC をする必要がありません。
また、パケットの送受信も一度で済みます。

実際に例を書きましょう。
Lua スクリプトを実行するには EVAL コマンドを利用します。
例えば、LRANGE を Lua から実行してみます。

> EVAL "return redis.call('LRANGE', KEYS[1], ARGV[1], ARGV[2])" 1 queue 0 -1 
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
7) "7"
8) "8"
9) "9"
10) "10"

こんな感じで値をそのまま返すことができます。

一応、KEYS と ARGV という2種類の値を受け付けることができるのですが、Lua スクリプトを単なるストアドプロシージャーとして捉えると ARGV のみあればよいと思い、ARGV しか使わなくて良い気がします。

Lua スクリプトを書く

さて、スクリプトを書くと言っても一行で書くのはつらいですね。
もちろん改行を入れた普通のスクリプトも実行することが出来ます。
今回のケースだと以下のような感じでしょう

-- dequeue.lua
local key       = ARGV[1]
local bulk_size = ARGV[2]

local list      = redis.call('LRANGE', key, 0, bulk_size - 1)
local list_size = #list;
if list_size == 0 then
    return nil
end

redis.call('LTRIM', key, list_size, -1);
return list

これを redis-cli で実行してみます。ARGV しか利用していないのでちょっと不格好ですが引数にいきなりカンマが出来てきます

$ redis-cli --eval dequeue.lua , queue 3
1) "1"
2) "2"
3) "3"
$ redis-cli --eval dequeue.lua , queue 3
1) "4"
2) "5"
3) "6"
$ redis-cli --eval dequeue.lua , queue 3
1) "7"
2) "8"
3) "9"
$ redis-cli --eval dequeue.lua , queue 3
1) "10"
$ redis-cli --eval dequeue.lua , queue 3
(nil)

上記のように複数回叩いてちゃんと次の queue が取れていることがわかります。
今回は queue がない場合は nil を返すようにしました。

Lua スクリプト Redis に事前に登録しておく

さて、ここまで書いてきましたが、毎回このファイルを読み込んで EVAL を行うのはちょっと無駄です。
そこで、SCRIPT LOAD を使うことで事前にこの Lua スクリプトを Redis に登録しておくことが出来ます。

$ redis-cli SCRIPT LOAD "$(cat dequeue.lua)"
"1a87bcfb1aaebb96ee92b96a2b60dd8192368ab3"

すると SHA1-hex が返ってきます。
これを EVALSHA に食わせることで、このスクリプトを動作させることが可能です。

> RPUSH queue 1 2 3 4 5 6 7 8 9 10 // データを作っておく
> EAVLSHA 1a87bcfb1aaebb96ee92b96a2b60dd8192368ab3 0 queue 3
1) "1"
2) "2"
3) "3"

うまく行きました。
つまり、これで dequeue の処理は

EAVLSHA 1a87bcfb1aaebb96ee92b96a2b60dd8192368ab3 0 queue 3

とするだけです。
パケットの往復も一度しか発生しないためエコですね。

最終形態(エラーハンドリングなし版)

最終的には以下のような感じになるでしょう。

key       = "queue"
bulk_size = 3
script    = "1a87bcfb1aaebb96ee92b96a2b60dd8192368ab3"

while (1) {
    list      = redis.evalsha(script, 0, key, bulk_size) // dequeue する
    list_size = list.length()                            // bulk_size 未満の可能性もあるので、list の size を取得しておく

    // ここでえられた queue のリストを処理する
}

これだけです!

残りの課題

さて、ここまでマルチプロセスやマルチスレッドで queue を処理する方法を書いてきましたが、解決していない問題があります。
それは queue の処理が失敗した場合のエラーハンドリングです。

  • dequeue は成功したけど、処理の途中で Worker が落ちてしまって処理が失敗した
  • 一部の queue の処理のみ成功した
  • 全部処理に失敗した

などのケースが考えられます。
上記の様な場合には queue がロストする事になります。

この場合は、Worker の処理の中で上手いことリトライして上げる必要があります。
当然リトライが不可能な queue の場合はログなどを吐いて捨ててしまえばいいですが、リトライ処理はいつも悩ましいものですね。

実際に運用している Worker ではシグナル処理や、上記のリトライの仕組み、そして dequeue して queue が空っぽだったときは interval などを設定しています。

はてさて、その1 というからには その2 があるのだろうか。そろそろ書くの飽きたぞい。

*1:クライアントライブラリーによってはより効率的に送信しているケースもあるかもしれない