seri::diary

日常

RubyでMapReduceを実装している

去年の12月ぐらいからRubyMapReduce*1を実装している。
一応、ちゃんと複数のマシンで分散処理ができるところまで実装できたので、今の進捗をまとめておく。

github.com

最初は分散処理で動作するものではなく、1台のマシンでマルチスレッドで動作する疑似分散処理の実装を作ってMapReduceアルゴリズムの理解を深めるのが目的だったが、せっかくなのでちゃんと複数台のマシンで動作するものを作ることにしてみた。

概要

システム全体のアーキテクチャは以下の通り。

f:id:serihiro:20180212175809p:plain

とは言っても、実用が目的ではないので以下の制約がある(Combinerはそのうち実装するかも)

  1. Maptaskは1台のnodeで動作し、Reduectaskは空いているJobWorkerの分だけshuffleして並列に動作する
  2. Combiner未対応なのでMaptaskのoutputをそのままshuffleする
  3. shuffleはkeyのhash値を空きJobWorkerの数で割った余りを使う単純なhashパーティションのみ
  4. MapReduceのinput/output/処理途中の中間データは全てS3に保管し、データのローカリティは一切考慮しない
  5. 途中で処理が失敗してもリカバリする手段はない

ローカルで動作させている様子をキャプチャしてみたが、なんとなくこれで伝わるだろうか。
S3の代わりにminio*2をローカルで実行し、JobTrackerが1host, JobWorkerが3host起動しており、maptaskが1hostで実行され、その後, reduectaskが2hostで実行されている様子である。

www.youtube.com

使い方

クラスタの起動の仕方はREADMEに書いたのでこちらを参考していただきたい。 Dockerイメージも用意したので、めんどくさい人は docker-compose up 一発でクラスタを起動できる。

ジョブは、MapタスクとReduceタスクをそれぞれ別クラスで用意すればよい。 なお、自分で用意するのがめんどくさい人向けにCLIにサンプルが入っており、手順だけまとめると、docker-compose upで起動した場合は以下のようにすればWordCountが動かせる。

$ docker-compose exec job_tracker bundle exec simple_map_reduce generate_lorem_text_data --upload=true
$ docker-compose exec job_tracker bundle exec simple_map_reduce execute_word_count

ちなみにここで実行しているWordCountのJobは以下のようなコードである。*3

class WordCount
  def map(input_data, output_io)
    input_data.split(' ').each do |raw_word|
      word = raw_word.strip
      next if word.empty?
      word.delete!('_=,.[]()#\'"-=~|&%')
      word.downcase!

      output_io.puts({ key: word, value: 1 }.to_json)
    end
  end
end
require 'json'
class WordCount
  def reduce(input_io, output_io)
    output = Hash.new(0)
    count = 0
    input_io.each_line(chomp: true, rs: "\n") do |line|
      input = JSON.parse(line, symbolize_names: true)
      output[input[:key]] += input[:value]
      count += 1
      if count % 100 == 0
        puts "current count: #{count}"
      end
    end

    output.each do |key, value|
      output_io.puts(JSON.generate(Hash[key, value]))
    end
  end
end

これをどうやってJobTrackerに渡して実行しているかというと、ソースコードをStringとしてJobTrackerにPOSTし、temporaryなクラスを生成した上でそのクラス内クラスとしてclass_evalして定義する、という力技により実現されている。*4
Hadoopだとjobをjarとして生成してjarをそのままNameNodeに渡すようになっているが、スクリプト言語である以上こうするしか思いつかなかった。。

使ったライブラリなど

sinatra

実はsinatra*5をちゃんと使ったことがなかったので、勉強を兼ねて使ってみた。
sinatra自体は歴史あるプロダクトなので今さら特に語ることもないのだが、1クラス1アプリの単位で実装できるのが結構都合がよかった。今回作った実装ではデータの永続化を一切しておらず、sinatraアプリとして実装したクラスのクラスインスタンス変数にすべて突っ込む力技*6を採用しているのだが、データストアとしての役割も兼任させる上では1クラス1アプリという単位は管理上都合がよかった。

また、rubyのクラスとしてweb appをそのまま実装して起動できるので、rubyからふつうに起動できるのも便利だと思った。railsだと bundle exec rails s みたいな感じで、普通はシェルスクリプトなどから実行するしか手段がないのだが、sinatrarubyスクリプトとして実行する以外の起動手段を持っている。

例えば今回作ったMapReduce実装には管理用のCLIを添付したのだが、このCLIでJobTrackerを起動する部分は以下のように実装している。*7

SimpleMapReduce::Server::JobWorker.run!(port: config.server_port, bind: '0.0.0.0') do
  SimpleMapReduce::Server::JobWorker.setup_worker
end

run! で起動できるのは割と周知された方法だと思うが、さらにblockを渡すことで起動前に処理を独自のcallbackを追加できる。これはドキュメントを調べても見当たらず、結局sinatraのソースを眺めていて見つけた。*8

また、同様にソースを見ていて発見した例として、sinatraアプリを終了するときに実行されるSinatra::Base.quit! *9をOverrideすることで、sinatraアプリを終了する時にもcallbackを挟むことができるので、WebRickを終了させる前にWorkerを終了させるのに利用している。*10

# @override
def quit!
  @keep_polling_workers = false
  @polling_workers_thread.kill
  job_manager.shutdown_workers!
  super
end

MessagePack

Web APIへデータを渡すときのserializeはいつものようにJSONでいいかなとも思ったが、せっかくなのでMessagePack*11を使ってみた。
独自にTypeを定義すれば自前のクラスもserialize/deserializeできるようだが*12、今回はとりあえずHashとして各種プロパティの値をdumpしたデータをserializeしているだけである。いずれ直接JobやTaskをserialize/deserializeできるようにしてみるのも面白いかもしれない。

Worker Threadの管理

Threadで非同期実行するWorkerを管理をするために別のgemを実装した。

github.com

中身を見てもらえば分かるがSidekiq*13っぽいI/Fで指定したクラスをjobとしてキューイングして、Thread poolに入ってるThread群で並列に実行できる、という程度のものである。JavaのConcurrency UtilにあるExecutorServiceみたいなものの超簡易版ぐらいに思ってもらえれば幸いである。

Sidekiq等のJobQueueと異なる点として、外部のデータストレージをキューストアとして使っていない。漢らしいオンメモリキューストアである。もちろんjobの優先度も管理していない。 jobをEnqueueする部分はこんな感じである。*14

module Rasteira
  module EmbedWorker
    # Manager class that manages the thread pool and executes jobs.
    class Manager
      attr_reader :job_pool

      def initialize
        @job_pool = []
        @thread_pool = []
        @mutex = Mutex.new
      end

      # 省略

      def enqueue_job!(worker_name, options = {})
        @mutex.synchronize do
          @job_pool << ::Rasteira::Core::Job.new(worker_name, options)
        end
      end

データストアが絡まないとここまで実装をシンプルにできるんだなぁと感心した。逆に、普段どれだけ外部システムとの連携に神経をすり減らしているかが良く分かる。

この漢らしい実装のおかげで、JobTrackerやJobWorkerのsinatraアプリとメモリ空間を共有することができるため、jobやworkerなどのオブジェクトの参照をそのまま渡すことができる。
なので、workerからjobやworkerなどのオブジェクトを直接変更することができる。セキュリティもへったくれもないのだが、実用を考えていないのでセキュリティには一旦目をつむりたい。*15

感想

MapReduceそのものの実装は簡単だったのが、その周辺のworker, job, taskを分散環境で管理するのが難しく、分散処理に関する実装に殆どの工数が取られてしまった。
分散処理で動作するプロダクトを作る際に本質的でない実装にものすごく工数がかかるのはGoogleの論文にも書いてあった通りで、まさに身をもって再試をした気分である。

しかし、お陰で何となくHadoopなどの分散処理フレームワークが裏で何をやってくれていて、その恩恵によってどれだけ本質的な処理にだけ集中できるようになったのかが少しは理解できたと思う。 今回作った実装についてはまだいろいろと実験しがいがありそうなので、今後も時間を見てアップデートしていく予定である。 今は分散処理や大規模データ処理の高速化のアルゴリズムについて興味があるので、今後も実装を通じて学んでいきたい所存。