boom を使って Datastore のページングで末尾を判定する方法

普通の Web アプリケーションのページングでもほぼ同様ですが、バッチ処理を想定したコードです。
Datastore では Cursor が返ってくるので、MySQLなどでよくある 1件多くとってその id を返す的な方法が使えないので、Count() を投げたとしても運が悪いと limit ぴったりの件数が返ってきてしまって1回多くクエリーを投げないといけない可能性が出てくる。
さらにいうと、Count()自体がクエリーを投げるのでやめたい。

終端まで行くと、最後になげた Cursor と同じ Cursor が返ってくることを確認した(Test コードを書いたり、実際の GAE 上でも動作した)ので、多分これが一番シンプル?
ただ、Undocumented っぽいのでより良い方法知っているひといたら教えてください!
ちなみに、同じ Cursor じゃなくて、EOF 的な err で返してくれたらより嬉しかった。
Filter とかは適当です。
goon とか datastore そのまま使うのでも一緒だと思います。

func NewBoom(ctx context.Context) *boom.Boom {
	ds, err := aedatastore.FromContext(ctx)
	if err != nil {
		panic(err)
	}
	return boom.FromClient(ctx, ds)
}

func GetItems(ctx context.Context, category String, limit int) ([]*Item, error) {
	items := make([]*Item, 0)

	b := NewBoom(ctx)
	cursorStr := ""
	for {
		q := b.NewQuery(b.Kind(Item{})).Filter("Category=", category).Limit(limit)
		if cursorStr != "" {
			cursor, err := b.DecodeCursor(cursorStr)
			if err != nil {
				return nil, err
			}
			q = q.Start(cursor)
		}

		itr := b.Run(q)
		for {
			i := &Item{}
			_, err := itr.Next(i)
			if err != nil {
				if err != iterator.Done {
					return nil, err
				}
				break
			}

			items = append(items, i)
		}

		nc, err := itr.Cursor()
		if err != nil {
			return nil, err
		}

		// 同じカーソルが帰ってきたら終端
		if cursorStr == nc.String() {
			break
		}

		cursorStr = nc.String()
	}

	return items, nil
}

YAPC::Tokyo 2019 にスピーカーとして参加しました

去る 2019/01/26 に開催された YAPC::Tokyo 2019 にスピーカーとして参加しました。

yapcjapan.org

自分は Perl to Go というタイトルで発表しました。
資料は以下になります。

40分枠の発表でしたが、スライドを作っている段階で「あ、これ90分のヤツや...」と気付いていましたが時は無常でもうどうしようもありませんでした。
案の定、半分以下しか話すことができず、いろいろとすいませんでした><
また、スライドを作り始めたのが YAPC の3日前だったのでかなり準備不足であり、反省しております。

2月後半ぐらいまでにはスライドのアップデートおよび、サンプルコードの公開をしたいなーと思ってはいるので、もし気になる人がいれば気長にお待ちいただければと。

YAPC に参加した感想としましては、年々若い人が増えていて海外とだいぶ違うぞという印象があり、おじさんも体力が減ってきたのか主張が減っている感じなのでこの調子で若い人たちが牽引していくコミュニティになっていくと面白いな〜と思ったりもしました。

あとは Charsbar さんの発表で、いろいろと問題のあるモジュールが CPAN に放置されているぞ!っていう話がありまして思い当たる節もありまくるので毎度のことながらそろそろなんとかせんとなあという気持ちになっています。

また、今年の YAPC ではなぜか乾杯の挨拶をさせていただきました。謎ですね。

次回の YAPC も楽しみですね!!

Released Net::APNs::HTTP2 0.02

APNs を簡単に送れる Net::APNs::HTTP2 の 0.02 をリリースしました。

metacpan.org

いままでは内部で使っている Crypt::JWT が使っている CryptX 内の Crypt::PK::ECC が APNs の pkcs8 ファイルをそのまま扱えなくて opnessl コマンドなどで変換しなくてはいけなかったのですが、CryptX が3月頃に対応したという issue が上がっていたので cpanfile にバージョンぶっこんでおきました。

今後は Developer Center からダウンロードした pkcs8 ファイルをそのまま使えるので便利ですね。

JSON::XS 4.0 では incompatible な変更がいくつかあるので注意

最近 JOSN::XS 4.0 が出たわけですが、Changes が以下のようになっております。

Changes for version 4.0 - 2018-11-16
・4.0 pre-release, do not use other than for testing.
・SECURITY IMPLICATION: this release enables allow_nonref by default for compatibnility with RFC 7159 and newer. See "old" vs. "new" JSON under SECURITY CONSIDERATIONS.
・reworked the "old" vs. "new" JSON section.
・add ->boolean_values to provide the values to which booleans decode (requested by Aristotle Pagaltzis).
・decode would wrongly accept ASCII NUL characters instead of reporting them as trailing garbage.
・work around what smells like a perl bug w.r.t. exceptions thrown in callbacks.
・incremental parser now more or less respects allow_nonref.
json_xs json-pretty now enables canonical mode.
・add documentation section about I-JSON.
・minor documentation fixes/updates.

まだ pre-release だから test 目的以外では使うなよって書いてありますね。developer release にしなかったのはなんでやろ(教えてエロい人!)
この変更では、RFC 7159 に沿うように、encode 時に `allow_nonref` がデフォルトで有効になりました。

いままでは以下のようなパターンではエラーになっていましたが、動作するようになっています。

encode_json("hoge") # ok
JSON->new->utf8->encode("fuga") # ok

特筆すべきは、`JSON->new` の時点で有効になっているということでしょう。
オフにするには明示的に

JSON->new->allow_nonref(0)

とする必要があります。

`decode_json` では今まで通りにエラーになるので、そこまで問題にならないと思いますが、JSON::XS 4.0 以降では気をつける必要がありそうです。

golang で AWS Lambda から IAM 認証を利用して RDS に接続するのにめっちゃハマった話

2017年5月ぐらいから IAM 認証で RDS の MySQL or Aurora に接続できるようになっていて、これを利用すれば Lambda から高速にデータベースアクセスができてハッピーになれそうということで試した。
で、aws-sdk-go をつかってアクセスしようとしたけど

Error 1045: Access denied for user 'iam_user'@'ec2-xx-xxx-xxx-xxx.ap-northeast-1.compute.amazonaws.com' (using password: YES)

となって全然繋がらなかった。

コードは以下のような感じ

package main

import (
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/service/rds/rdsutils"

	"fmt"

	"crypto/x509"
	"io/ioutil"

	"crypto/tls"

	"database/sql"

	"github.com/go-sql-driver/mysql"
)

var certFile = "./rds-combined-ca-bundle.pem"
var dbEndpoint = "foo.bar.ap-northeast-1.rds.amazonaws.com"
var dbUser = "iam_user"
var dbName = "iamtest"
var dbPort = "3306"
var awsRegion = "ap-northeast-1"

func RDSConnect() (string, error) {
	awsCredentials := credentials.NewEnvCredentials()
	authToken, err := rdsutils.BuildAuthToken(
		dbEndpoint,
		awsRegion,
		dbUser,
		awsCredentials,
	)
	if err != nil {
		panic(err.Error())
	}
	dsnStr := fmt.Sprintf(
		"%s:%s@tcp(%s:%s)/%s?tls=true&allowCleartextPasswords=true",
		dbUser, authToken, dbEndpoint, dbPort, dbName,
	)

	db, err := sql.Open("mysql", dsnStr)
	if err != nil {
		panic(err.Error())
	}
	defer db.Close()

	return "ok", nil
}

func RegisterTLSConfig() {
	rootCertPool := x509.NewCertPool()
	pem, err := ioutil.ReadFile(certFile)
	if err != nil {
		panic(err.Error())
	}
	if ok := rootCertPool.AppendCertsFromPEM(pem); !ok {
		panic(err.Error())
	}
	err = mysql.RegisterTLSConfig("custom", &tls.Config{
		RootCAs: rootCertPool,
	})
	if err != nil {
		panic(err.Error())
	}
}

func main() {
	RegisterTLSConfig()
	lambda.Start(RDSConnect)
}

結論から言うと、rdsutils.BuildAuthToken の引数の dbEndopoint は hostname:port の形式で書かないとダメ。

	authToken, err := rdsutils.BuildAuthToken(
		fmt.Sprintf("%s:%s", dbEndpoint, dbPort),
		awsRegion,
		dbUser,
		awsCredentials,
	)

とすれば動く。

そしていまドキュメントを再読したら

rdsutils - Amazon Web Services - Go SDK

Endpoint consists of the hostname and port, IE hostname:port, of the RDS database.

とバッチリ書いてあった!!
と言うわけでめちゃくちゃハマった話でした。

その他の注意点

  • RDS への IAM 認証は 1秒間に新規で 20接続までしかできない
  • rds-db-connect という特殊な IAM ポリシーを作って Lambda にアタッチしなければならない
  • データベース上で、AWSAuthenticationPlugin を有効にしたユーザーを作らなければならない
  • TLS 接続が必須で、rds-combined-ca-bundle.pem を指定しなければならない

などなど色々と大変。
接続数20はハードリミットで変更できないらしく、現状はかなり限定的な要件でしか使えないなーという印象です。

詳しくは以下を参照のこと

docs.aws.amazon.com

Redis を bulk queue として使う(その1)

最近 bulk 処理できるメッセージキューが必要なことがあったので Redis を使ってサックリと作ってみました。
bulk 処理というのは、例えばあるキューを100件ずつとか取得して処理することです。
こうすることで、1件ずつ処理を行うよりもスループットを上げることが可能になるケースがあります。

つかうコマンドは RPUSH、LRANGE、LTRIM の3つだけ。

駄目なケース

処理の流れはこんな感じ

  • だれかしらが enqueue する
    • API とかバッチとか管理ツールとかいろいろあるかもしれない
  • Worker が queue を監視していて、queue が入ってきたら dequeue する
    • Worker は取得できた queue のリストをもろもろ処理する
    • 処理が成功したら queue を削除する

まずは queue をぶっこんでみます。
この時、RPUSH は引数を複数受け付けることができるので bluk で enqueue 可能なので積極的に使っていったほうがよい。

> RPUSH queue 1 2 3 4 5 6 7 8 9 10
(integer) 10

今回は3件ずつ処理することにします

> LRANGE queue 0 2
1) "1"
2) "2"
3) "3"

index は 0 から始まるので 0〜2 = 3件取得できます。
取得できたら Worker がその queue を適切に処理します。

最後に処理した queue を削除します。

> LTRIM queue 3 -1

LTRIM は指定した範囲を残すので、index 3〜最後の要素としていすることで、先程取得した queue が消えます。

> LRANGE queue 0 -1
1) "4"
2) "5"
3) "6"
4) "7"
5) "8"
6) "9"
7) "10"

とすれば確認できますね。

さて、この処理を行う Worker を疑似コードで書くと以下のような感じ。

key       = "queue"
bulk_size = 3

while (1) {
    list      = redis.lrange(key, 0, bulk_size) // dequeue する
    list_size = list.length()                   // bulk_size 未満の可能性もあるので、list の size を取得しておく

    // ここでえられた queue のリストを処理する

    redis.ltrim(key, list_size, -1) // 処理したリストを削除して終了
}

何が駄目なのか

上記の方法では、dequeue した queue を削除するまでに幾つかの処理が挟まっているので、他のプロセスやスレッドなどがある場合に、同じ queue を掴んでしまう可能性が高いです。
queue を取得したら、即座に対象の queue を削除することが求められます。

以下のように変更しましょう

key       = "queue"
bulk_size = 3

while (1) {
    list      = redis.lrange(key, 0, bulk_size) // dequeue する
    redis.ltrim(key, list_size, -1)             // 処理したリストを削除して終了
    list_size = list.length()                   // bulk_size 未満の可能性もあるので、list の size を取得しておく

    // ここでえられた queue のリストを処理する
}

これで問題ないでしょうか?
これでも全然だめですね。
というのも、上記のコードの lrange と ltrim は別々に発行されているので、間に別のプロセスやスレッドが挟まるケースが依然存在します。

MULTI - EXEC を利用する

さて、というわけで皆さんおなじみの MULTI - EXEC でくくることで、一連の処理を1つのトランザクションとして実行することが可能になります。

key       = "queue"
bulk_size = 3

while (1) {
    redis.multi()
        redis.lrange(key, 0, bulk_size) // dequeue する
        redis.ltrim(key, list_size, -1) // 処理したリストを削除して終了
    ret       = redis.exec()
    list      = ret[0]        // lrangei() の結結果を取得
    list_size = list.length() // bulk_size 未満の可能性もあるので、list の size を取得しておく

    // ここでえられた queue のリストを処理する
}

これでひとまずは大丈夫そうです!やったね!!

Luaスクリプトを利用する

さて、トランザクションを組むことが出来たのでマルチプロセスやマルチスレッドで同じ queue をつかむ心配がなくなりました。
しかし、MUIT - EXEC は実は全てのコマンドのパケットを redis server に逐一送ってしまうのです!!*1

そこで Lua スクリプトですよ!!

Redis には Lua が組み込まれていて、使うことができるんですね。
しかもその Lua スクリプト内で行う処理はすべて同一トランザクションで行われるので MULTI - EXEC をする必要がありません。
また、パケットの送受信も一度で済みます。

実際に例を書きましょう。
Lua スクリプトを実行するには EVAL コマンドを利用します。
例えば、LRANGE を Lua から実行してみます。

> EVAL "return redis.call('LRANGE', KEYS[1], ARGV[1], ARGV[2])" 1 queue 0 -1 
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
7) "7"
8) "8"
9) "9"
10) "10"

こんな感じで値をそのまま返すことができます。

一応、KEYS と ARGV という2種類の値を受け付けることができるのですが、Lua スクリプトを単なるストアドプロシージャーとして捉えると ARGV のみあればよいと思い、ARGV しか使わなくて良い気がします。

Lua スクリプトを書く

さて、スクリプトを書くと言っても一行で書くのはつらいですね。
もちろん改行を入れた普通のスクリプトも実行することが出来ます。
今回のケースだと以下のような感じでしょう

-- dequeue.lua
local key       = ARGV[1]
local bulk_size = ARGV[2]

local list      = redis.call('LRANGE', key, 0, bulk_size - 1)
local list_size = #list;
if list_size == 0 then
    return nil
end

redis.call('LTRIM', key, list_size, -1);
return list

これを redis-cli で実行してみます。ARGV しか利用していないのでちょっと不格好ですが引数にいきなりカンマが出来てきます

$ redis-cli --eval dequeue.lua , queue 3
1) "1"
2) "2"
3) "3"
$ redis-cli --eval dequeue.lua , queue 3
1) "4"
2) "5"
3) "6"
$ redis-cli --eval dequeue.lua , queue 3
1) "7"
2) "8"
3) "9"
$ redis-cli --eval dequeue.lua , queue 3
1) "10"
$ redis-cli --eval dequeue.lua , queue 3
(nil)

上記のように複数回叩いてちゃんと次の queue が取れていることがわかります。
今回は queue がない場合は nil を返すようにしました。

Lua スクリプト Redis に事前に登録しておく

さて、ここまで書いてきましたが、毎回このファイルを読み込んで EVAL を行うのはちょっと無駄です。
そこで、SCRIPT LOAD を使うことで事前にこの Lua スクリプトを Redis に登録しておくことが出来ます。

$ redis-cli SCRIPT LOAD "$(cat dequeue.lua)"
"1a87bcfb1aaebb96ee92b96a2b60dd8192368ab3"

すると SHA1-hex が返ってきます。
これを EVALSHA に食わせることで、このスクリプトを動作させることが可能です。

> RPUSH queue 1 2 3 4 5 6 7 8 9 10 // データを作っておく
> EAVLSHA 1a87bcfb1aaebb96ee92b96a2b60dd8192368ab3 0 queue 3
1) "1"
2) "2"
3) "3"

うまく行きました。
つまり、これで dequeue の処理は

EAVLSHA 1a87bcfb1aaebb96ee92b96a2b60dd8192368ab3 0 queue 3

とするだけです。
パケットの往復も一度しか発生しないためエコですね。

最終形態(エラーハンドリングなし版)

最終的には以下のような感じになるでしょう。

key       = "queue"
bulk_size = 3
script    = "1a87bcfb1aaebb96ee92b96a2b60dd8192368ab3"

while (1) {
    list      = redis.evalsha(script, 0, key, bulk_size) // dequeue する
    list_size = list.length()                            // bulk_size 未満の可能性もあるので、list の size を取得しておく

    // ここでえられた queue のリストを処理する
}

これだけです!

残りの課題

さて、ここまでマルチプロセスやマルチスレッドで queue を処理する方法を書いてきましたが、解決していない問題があります。
それは queue の処理が失敗した場合のエラーハンドリングです。

  • dequeue は成功したけど、処理の途中で Worker が落ちてしまって処理が失敗した
  • 一部の queue の処理のみ成功した
  • 全部処理に失敗した

などのケースが考えられます。
上記の様な場合には queue がロストする事になります。

この場合は、Worker の処理の中で上手いことリトライして上げる必要があります。
当然リトライが不可能な queue の場合はログなどを吐いて捨ててしまえばいいですが、リトライ処理はいつも悩ましいものですね。

実際に運用している Worker ではシグナル処理や、上記のリトライの仕組み、そして dequeue して queue が空っぽだったときは interval などを設定しています。

はてさて、その1 というからには その2 があるのだろうか。そろそろ書くの飽きたぞい。

*1:クライアントライブラリーによってはより効率的に送信しているケースもあるかもしれない