以前紹介した MapReduce フレームワーク MyMR の開発を少し進めました.
パッケージとしてのリリースはまだですが.

この記事では変更点等について紹介します.

MyMR の仕組みなどについては前回の記事を参照してください.

ビルダーによる Mapper/Reducer の定義

元々はクラスにメソッドを定義する形で行っていた Mapper/Reducer の定義を, ビルダーを使って定義できるようにしました.

この方法を使った場合, Mapper も Reducer も無名関数だけで定義できるようになったので, よりカジュアルになったと言えるでしょう.

また, クラスベースの定義では, 入出力のテーブルをコマンドラインオプションで指定していましたが, こちらではビルダーで指定できるようになりました.
なので設定ファイルの別出しすることもできるようになっています.

また, コマンドラインで指定した場合はそちらが優先されるので, テスト時はテスト用のデータベースを使う, ということもできます.

無名関数以外でも, call_user_func_array() で実行できるもの (PHP 5.4 で言うところの callable) であれば何でもいいので, インスタンスメソッドなどを Mapper として定義することもできます.

簡単なものは無名関数でサクッと定義してい, 複雑なものはユニットテストされたクラスのメソッドで定義する, といった使い分けができるようになっています.

クラスベースの定義も一応残していますが, ビルダーの方が圧倒的に使いやすいように思えるので, そのうち消すかもしれません.

コマンド名の変更

以前は mymr execute コマンドでクラスベースの MapReduce 定義を実行していましたが, ビルダーによる定義の追加に伴い mymr class コマンドに変更しました.

Emitter の切り出し

Mapper はもともと \MyMR \Base クラスの emit メソッドを $this->emit($key, $value) として読んでいましたが, 無名関数で同様のことをやるには PHP 5.4 以降の Closure::bindTo() を使用する必要が出てきてしまうため, Emitter クラスを別に作り, Mapper に引数として渡すようにしました.

これにより, Mapper のテスト時には Emitter をモックオブジェクトや Test Spy オブジェクトに差し替えてテストする, ということもできるようになりました.

進捗の表示

元々は実行しても無言で終了していましたが, Map と Reduce それぞれで進捗を表示するようにしました.

進捗の表示

進捗の表示

今回 \MyMR \Progress という形で実装しましたが, これ自体の MyMR との結合は緩く, Symfony の Console コンポーネントの OutputInterface にしか依存していないので, 別で公開できればと思っています.

まとめ

遊びで作り始めたものですが, 少しずつ様になって来ました.
まだまだ開発が安定しないので, インターフェイス等は大幅に変わる可能性があると思いますが, 引き続き開発を続けて行きたいと思います.

MyMR については, 3/27 の第58回PHP勉強会@東京で発表する予定なので, 後日スライドを公開する予定です. (今からつくる)

あと, MySQL Casual Talks Vol. 3 の LT 枠でも喋りたいと思っているんですけど, これってもう募集していないんでしょうか…

,

副題: ビッグデータ時代の非ビッグデータ集計戦略

PHP と MySQL を使ってカジュアルに MapReduce する MyMR というものを作ってみました.
とても安直な名前ですね.

yuya-takeyama/mymr – GitHub

とりあえず試してみる

MyMR には, MapReduce のマナー (?) に従って, WordCount するためのサンプルコードとサンプルデータを同梱してみました.

map/reduce 関数は PHP で書かれています.
WordCount.php

MySQL のユーザ名・パスワード等は適宜置き換えて下さい.

見事, 入力テーブル内の単語の出現回数を集計することができました.

MyMR の特徴

  • データの入出力はいずれも MySQL のテーブル
  • 入力と出力のデータベースは同じでもいいし別でもいい
    (入力はプロダクションサービスの Slave サーバで, 出力はデータ集計用の別サーバ, とかいうこともできる)
  • map/reduce 関数を PHP で書く
  • MySQL を意識することは無く, PHP 標準の array にほげほげするだけ
  • 分散/冗長性/耐障害性などについては特に考えていないし, 考える予定も無い
    (エラー処理はちゃんとしたい)
  • 並列処理でなく直列処理
    (並列処理は出来た方がいいと思っている)

まだ「とりあえず動く」程度の状態なので, 色々足りてない状態ではありますが, ギガ/テラバイト級のデータを相手にするのであれば Hadoop とかを使うべきだと思いますし, MyMR はそのような問題を解決しようとはしていません.

モチベーション

  • MySQL に直接 MapReduce 処理を行いたい
  • GROUP BY よりはもっと複雑な処理がやりたい
  • map/reduce を LL で書きたい
  • プログラミングモデルとしての MapReduce を活用したい

DBMS 上で MapReduce をやりたいのであれば MongoDB や CouchDB という選択肢がありますし, map/reduce を LL で書きたいというのであれば Hadoop Streaming という選択肢があるでしょう.
そうなると, やはり一番のモチベーションは MySQL で MapReduce する ということに尽きるように思います.

MongoDB による MapReduce も JavaScript でカジュアルにできてとても便利です.
ですが, 自分の場合, 普段の仕事だと, そもそものデータはほとんど MySQL に入っているので,

  1. mysqldump で csv ファイルを作成
  2. mongoimport で MongoDB にインポート
  3. MongoDB 上で MapReduce

みたいな手順を踏む必要があり, 非常に面倒です.
場合によっては mongoexport で csv に出力してさらに MySQL 上にインポート, なんてことをやることもあって, 正直疲れました.

MySQL で ということの次に重要だと思うのが, プログラミングモデルとしての MapReduce です.
これについては書くと長くなりそうなのでやめておきますが, Haskell のような関数型言語をつまみ食いすることで, この辺りの魅力がわかってきたように思います.
(関数型言語における map 関数と Hadoop/MongoDB/MyMR における Map はちょっと違いますが)
伊藤直也さんの MapReduce::Lite なんかも, その辺りにモチベーションがあって作られたのではないか, と想像しています.

何故 PHP か

特に大した理由はありません.
強いていえば仕事で使いたいと思っていて, 周囲で使われているのが PHP である, というだけの話です.

ついでに挙げるなら, PHP では MySQL ドライバが標準で備わっている, というのもあります.
MyMR では PDO を使っており, 普通に構築された LAMP 環境であれば, まず問題無く使えると思います.

とはいえ, 例えば Bundler の使える環境であればそういった依存関係に悩まされることもありませんし, 正直 Ruby で書き直したい.

MyMR の仕組み

仕組みといえるほど複雑なことはやっていませんが…

1: 入力データの取得

これはとても単純で, mymr コマンドの -i (–input) オプションに渡したテーブルを全件 SELECT するのみです.

今の所全データをガバっとメモリ内に取り込むようになっているので, テーブルサイズに比例してメモリを食います.
非ビッグデータ向けのフレームワークとはいえ, これはさすがにあんまりなので, 何とかするつもりです.

あと, 未実装ですが, WHERE 条件なども指定できれば入力の段階でフィルタできて便利そうなので, やろうと思っています.

2: Map

SELECT で取得した全ての行に対して Map 処理を行います.

MyMR の map 関数では, その中で emit メソッドを実行することで, テンポラリテーブルにデータを INSERT していきます.
関数のプロトタイプは void map(array $record) というシンプルなもので, 1 レコードを表すハッシュ (PHP なので array) を受け取るだけです.

入力テーブルの定義についての制約は特にありません.
通常どんなテーブルでも入力として扱うことができます.

emit にはキーとそれに対応する値を渡すことで, 中間データがテンポラリテーブルに 1 行ずつ挿入されます.
本来は MySQL セッションが終了した時点でテーブルは消えるのですが, 先の WordCount の例でいうと以下のようなテーブルが作成されています.

emit に渡された値は自動的に JSON に変換されています.

JSON 化してるのは構造化データを扱えるようにするためで, この辺りは MongoDB の MapReduce にインスパイアされてます.

3: Shuffle/Sort (?)

Hadoop なんかで言うところの Shuffle/Sort 付近に該当する, と思っているのですが, あんまり自信ないです.
ぶっちゃけると Hadoop 自体を使ったことは無くて, ブログ記事や書籍でつまみ食いした程度の知識しかありません.
(り, りろんはしってる (知らない))

とにかく, ここでは Reduce フェーズの前の下準備として, キーが同一のものをグループ化しています.

MyMR ではカジュアルに GROUP BY と GROUP_CONCAT を使っています.

実際は GROUP_CONCAT の SEPARATOR には改行コードの LF を指定しており, 複数の JSON が改行区切りで連結されたものになります.
要するに複雑なパース無しで複数の JSON を分割できるデリミタであれば何でもよくて, 無難に LF を使っているというだけの話です.

ところで, ここで JSON の代わりに MessagePack を使えれば, それだけである程度高速化できると思うんですが, そのときのデリミタは何にすればいいんでしょう?

4: Reduce

前の Shuffle フェーズで実行したクエリの結果全行に対して, 1 行 1 行 Reduce 処理を行います.

Map と同様, reduce 関数に 1 行 1 行の値を渡していきます.
関数のプロトタイプは array reduce(string $key, array $values) となっています.
$values では GROUP_CONCAT で改行区切りとなっていた JSON を予めパースし, 値の配列として渡されます.

そして返り値のハッシュがそのまま 1 行として -o (–output) に指定されたテーブルに 1 行 1 行挿入されていきます.

また, map と違って単純な入出力モデルになっているので, テストが書きやすいという利点があります.

そしてこれも map と違って, 挿入するデータは JSON ではなく, ハッシュのキーをカラム名としてレコードにマッピングされます.
そのため, 出力テーブルの定義は reduce 処理に合わせておく必要があります.

これまた map と違って, 出力テーブルでは key という名前のカラムが, キーを格納するために予約されており, reduce 返すキーとしては避ける必要があります.
それ以外は自由ですが, key カラムにはユニークインデックスを設定しておくのが良いでしょう.

まとめ

MyMR について長々と書いてきましたが, この記事で一番主張したいのは「MySQL から直接 MapReduce できたら便利じゃないか」ということに尽きます.
そういう需要があってもよさそうなものですが, ググってみても意外とそれらしい記事は見つからず, じゃあ作ってみようということでできたのが MyMR です.

ほとんどプロトタイプみたいなもので, 作り込みはまだまだこれからですが, 「MySQL で MapReduce する」というアイディアを発表したくて, この記事を書いています.

また, 今回はたまたま MySQL でしたが, 入出力を抽象化して MapReduce をするための何かがあれば, いろいろ捗るんじゃないかなぁという妄想もあります.

ビッグデータは無くとも, ビッグデータ時代に生まれた知見を活かすことはできるんじゃないか, というお話でした.

See also

,