去年の12月ぐらいからRubyでMapReduce*1を実装している。
一応、ちゃんと複数のマシンで分散処理ができるところまで実装できたので、今の進捗をまとめておく。
最初は分散処理で動作するものではなく、1台のマシンでマルチスレッドで動作する疑似分散処理の実装を作ってMapReduceのアルゴリズムの理解を深めるのが目的だったが、せっかくなのでちゃんと複数台のマシンで動作するものを作ることにしてみた。
概要
システム全体のアーキテクチャは以下の通り。
とは言っても、実用が目的ではないので以下の制約がある(Combinerはそのうち実装するかも)
- Maptaskは1台のnodeで動作し、Reduectaskは空いているJobWorkerの分だけshuffleして並列に動作する
- Combiner未対応なのでMaptaskのoutputをそのままshuffleする
- shuffleはkeyのhash値を空きJobWorkerの数で割った余りを使う単純なhashパーティションのみ
- MapReduceのinput/output/処理途中の中間データは全てS3に保管し、データのローカリティは一切考慮しない
- 途中で処理が失敗してもリカバリする手段はない
ローカルで動作させている様子をキャプチャしてみたが、なんとなくこれで伝わるだろうか。
S3の代わりにminio*2をローカルで実行し、JobTrackerが1host, JobWorkerが3host起動しており、maptaskが1hostで実行され、その後, reduectaskが2hostで実行されている様子である。
使い方
クラスタの起動の仕方はREADMEに書いたのでこちらを参考していただきたい。
Dockerイメージも用意したので、めんどくさい人は docker-compose up
一発でクラスタを起動できる。
ジョブは、MapタスクとReduceタスクをそれぞれ別クラスで用意すればよい。
なお、自分で用意するのがめんどくさい人向けにCLIにサンプルが入っており、手順だけまとめると、docker-compose up
で起動した場合は以下のようにすればWordCountが動かせる。
$ docker-compose exec job_tracker bundle exec simple_map_reduce generate_lorem_text_data --upload=true $ docker-compose exec job_tracker bundle exec simple_map_reduce execute_word_count
ちなみにここで実行しているWordCountのJobは以下のようなコードである。*3
class WordCount def map(input_data, output_io) input_data.split(' ').each do |raw_word| word = raw_word.strip next if word.empty? word.delete!('_=,.[]()#\'"-=~|&%') word.downcase! output_io.puts({ key: word, value: 1 }.to_json) end end end
require 'json' class WordCount def reduce(input_io, output_io) output = Hash.new(0) count = 0 input_io.each_line(chomp: true, rs: "\n") do |line| input = JSON.parse(line, symbolize_names: true) output[input[:key]] += input[:value] count += 1 if count % 100 == 0 puts "current count: #{count}" end end output.each do |key, value| output_io.puts(JSON.generate(Hash[key, value])) end end end
これをどうやってJobTrackerに渡して実行しているかというと、ソースコードをStringとしてJobTrackerにPOSTし、temporaryなクラスを生成した上でそのクラス内クラスとしてclass_evalして定義する、という力技により実現されている。*4
Hadoopだとjobをjarとして生成してjarをそのままNameNodeに渡すようになっているが、スクリプト言語である以上こうするしか思いつかなかった。。
使ったライブラリなど
sinatra
実はsinatra*5をちゃんと使ったことがなかったので、勉強を兼ねて使ってみた。
sinatra自体は歴史あるプロダクトなので今さら特に語ることもないのだが、1クラス1アプリの単位で実装できるのが結構都合がよかった。今回作った実装ではデータの永続化を一切しておらず、sinatraアプリとして実装したクラスのクラスインスタンス変数にすべて突っ込む力技*6を採用しているのだが、データストアとしての役割も兼任させる上では1クラス1アプリという単位は管理上都合がよかった。
また、rubyのクラスとしてweb appをそのまま実装して起動できるので、rubyからふつうに起動できるのも便利だと思った。railsだと bundle exec rails s
みたいな感じで、普通はシェルスクリプトなどから実行するしか手段がないのだが、sinatraはrubyのスクリプトとして実行する以外の起動手段を持っている。
例えば今回作ったMapReduce実装には管理用のCLIを添付したのだが、このCLIでJobTrackerを起動する部分は以下のように実装している。*7
SimpleMapReduce::Server::JobWorker.run!(port: config.server_port, bind: '0.0.0.0') do SimpleMapReduce::Server::JobWorker.setup_worker end
run!
で起動できるのは割と周知された方法だと思うが、さらにblockを渡すことで起動前に処理を独自のcallbackを追加できる。これはドキュメントを調べても見当たらず、結局sinatraのソースを眺めていて見つけた。*8
また、同様にソースを見ていて発見した例として、sinatraアプリを終了するときに実行されるSinatra::Base.quit!
*9をOverrideすることで、sinatraアプリを終了する時にもcallbackを挟むことができるので、WebRickを終了させる前にWorkerを終了させるのに利用している。*10
# @override def quit! @keep_polling_workers = false @polling_workers_thread.kill job_manager.shutdown_workers! super end
MessagePack
Web APIへデータを渡すときのserializeはいつものようにJSONでいいかなとも思ったが、せっかくなのでMessagePack*11を使ってみた。
独自にTypeを定義すれば自前のクラスもserialize/deserializeできるようだが*12、今回はとりあえずHashとして各種プロパティの値をdumpしたデータをserializeしているだけである。いずれ直接JobやTaskをserialize/deserializeできるようにしてみるのも面白いかもしれない。
Worker Threadの管理
Threadで非同期実行するWorkerを管理をするために別のgemを実装した。
中身を見てもらえば分かるがSidekiq*13っぽいI/Fで指定したクラスをjobとしてキューイングして、Thread poolに入ってるThread群で並列に実行できる、という程度のものである。JavaのConcurrency UtilにあるExecutorServiceみたいなものの超簡易版ぐらいに思ってもらえれば幸いである。
Sidekiq等のJobQueueと異なる点として、外部のデータストレージをキューストアとして使っていない。漢らしいオンメモリキューストアである。もちろんjobの優先度も管理していない。 jobをEnqueueする部分はこんな感じである。*14
module Rasteira module EmbedWorker # Manager class that manages the thread pool and executes jobs. class Manager attr_reader :job_pool def initialize @job_pool = [] @thread_pool = [] @mutex = Mutex.new end # 省略 def enqueue_job!(worker_name, options = {}) @mutex.synchronize do @job_pool << ::Rasteira::Core::Job.new(worker_name, options) end end
データストアが絡まないとここまで実装をシンプルにできるんだなぁと感心した。逆に、普段どれだけ外部システムとの連携に神経をすり減らしているかが良く分かる。
この漢らしい実装のおかげで、JobTrackerやJobWorkerのsinatraアプリとメモリ空間を共有することができるため、jobやworkerなどのオブジェクトの参照をそのまま渡すことができる。
なので、workerからjobやworkerなどのオブジェクトを直接変更することができる。セキュリティもへったくれもないのだが、実用を考えていないのでセキュリティには一旦目をつむりたい。*15
感想
MapReduceそのものの実装は簡単だったのが、その周辺のworker, job, taskを分散環境で管理するのが難しく、分散処理に関する実装に殆どの工数が取られてしまった。
分散処理で動作するプロダクトを作る際に本質的でない実装にものすごく工数がかかるのはGoogleの論文にも書いてあった通りで、まさに身をもって再試をした気分である。
しかし、お陰で何となくHadoopなどの分散処理フレームワークが裏で何をやってくれていて、その恩恵によってどれだけ本質的な処理にだけ集中できるようになったのかが少しは理解できたと思う。 今回作った実装についてはまだいろいろと実験しがいがありそうなので、今後も時間を見てアップデートしていく予定である。 今は分散処理や大規模データ処理の高速化のアルゴリズムについて興味があるので、今後も実装を通じて学んでいきたい所存。
*1:https://research.google.com/archive/mapreduce.html
*2:https://github.com/minio/minio
*3:https://github.com/serihiro/simple_map_reduce/blob/master/exe/simple_map_reduce#L94-L130
*4:https://github.com/serihiro/simple_map_reduce/blob/master/lib/simple_map_reduce/worker/run_map_task_worker.rb#L9-L10
*6:https://github.com/serihiro/simple_map_reduce/blob/master/lib/simple_map_reduce/server/job_tracker.rb#L168-L170
*7:https://github.com/serihiro/simple_map_reduce/blob/master/exe/simple_map_reduce#L30-L32
*8:https://github.com/sinatra/sinatra/blob/master/lib/sinatra/base.rb#L1447-L1448
*9:https://github.com/sinatra/sinatra/blob/master/lib/sinatra/base.rb#L1435
*10:https://github.com/serihiro/simple_map_reduce/blob/master/lib/simple_map_reduce/server/job_tracker.rb#L320-L326
*12:https://github.com/msgpack/msgpack-ruby#extension-types
*13:https://github.com/mperham/sidekiq
*14:https://github.com/serihiro/rasteira/blob/master/lib/rasteira/embed_worker/manager.rb#L55-L59
*15:んなこといったら、Hadoopだってネットワーク的に完全に外部と遮断された安全な環境でしかクラスタ組んで使う前提、と思われるぐらい、デフォルトでは何の認証も入っていない。分散処理界隈はたぶんそういう前提があるもんなのだ。きっと。