Ruby の Monitor と ConditionVariable の使い方
Ruby の Monitor と ConditionVariable の使い方
なかなかぐぐっても日本語の資料が見つからなかったので自分で動かしてみた。(ぐぐる能力低い)
まずThreadの直列化
require "thread" require "monitor" moni = Monitor.new val = 0 Thread.new { 3.times { puts "thread1 start: #{val}" val+=1 sleep 0.1 puts "thread1 end: #{val}" } } Thread.new { 3.times { puts "thread2 start: #{val}" val+=1 sleep 0.1 puts "thread2 end: #{val}" } } sleep
結果は以下の通り。
start > end, start > end と一つ一つ処理してほしいのに途中でまざってしまってる。
thread1 start: 0 thread1 end: 1 thread1 start: 1 # もうココでthread2が割り込んでしまっている。。 thread2 start: 1 thread2 end: 3thread1 end: 3 thread1 start: 3 thread2 start: 4 thread1 end: 5 thread2 end: 5 thread2 start: 5 thread2 end: 6
synchronizeをつけて再度実行
require "thread" require "monitor" moni = Monitor.new val = 0 Thread.new { moni.synchronize { 3.times { puts "thread1 start: #{val}" val+=1 sleep 0.1 puts "thread1 end: #{val}" } } } Thread.new { sleep 0.1 moni.synchronize { 3.times { puts "thread2 start: #{val}" val+=1 sleep 0.1 puts "thread2 end: #{val}" } } } sleep
thread1 start: 0 thread1 end: 1 thread1 start: 1 thread1 end: 2 thread1 start: 2 thread1 end: 3 thread2 start: 3 thread2 end: 4 thread2 start: 4 thread2 end: 5 thread2 start: 5 thread2 end: 6
うまくいったヾ(@^▽^@)ノ
Thread同士のまちあわせ。
Thread同士の処理を待ち合わせるのにループでポールしてもいいんだけど、そういうプログラムって往々にして扱いにくい(気がする)のでモダンなやり方を調べてみた。
require "thread" require "monitor" moni = Monitor.new # thread同士の待ち合わせには ConditionVariableというのを使う # Monitor#new_cond でそのMonitorにひもづいている # ConditionVariable のインスタンスを取得する事ができる cond = moni.new_cond thread1 = Thread.new { moni.synchronize { # ConditionVariable#waitで待ち合わせる。 # 必ずロックを取得した thread でしか wait は呼べない。 # つまり以下の二つの状態でした wait は呼べない # ・Monitor#enter でロック取得した以降 Monitor#exit でロックを解放するまで # ・Monitor#synchronize ブロックの中。(このサンプルだとこっち) # # waitを呼ばれた thread は一度 lock を解放し thread を sleep させ # 他の Thread に処理を移す。 cond.wait # 起こされたここから処理が再開する。 puts "receive" } } thread2 = Thread.new { sleep 1 moni.synchronize { # condで待ってる他のthreadを起こす。 cond.signal } } thread1.join; thread2.join
上記の応用として ThreadPoolをつくってみた
class ThreadPool # 渡されたブロックをThreadで処理する def initialize(size=5, &block) @threads = [] # Threadの格納しておく配列 @jobs = [] # blockに渡す引数を格納し、Queueの役割をする。 @monit = Monitor.new @cond = @monit.new_cond @block = block # 実行される処理 @size = size # 最大Thread数 end def run @size.times do spawn_thread end end def run_thread # synchronize抜けてから処理を実行しないと # Threadを使った並列処理ができない。 # @jobsは全てのthreadで共通のスコープの変数なので # synchronizeの外で触るとくちゃくちゃになるので # いったんThread固有のローカル変数に格納して # このローカル変数を引数にスレッドブロックを呼ぶ。 job = nil loop do @monit.synchronize do # signalを受け取るたびに block を評価して # 真になれば続きの処理を行い、偽であれば再度スリープする。 @cond.wait_until { @jobs.size > 0 } # 真になったthreadのみココにくる job = @jobs.pop end @block.call(job) end end def spawn_thread @monit.synchronize do @threads << Thread.new(&method(:run_thread)) end end def <<(job) @monit.synchronize do # jobにキューを格納しThreadを一つ起こす @jobs << job @cond.signal end end end # Threadを5つ作り、スレッドが実行するブロックを渡す。 pool = ThreadPool.new(5) do |arg| p arg # heavy process sleep 3 end # Threadを準備 pool.run # 10回キューする 10.times do pool << "aa" end sleep
あとは、Thread止める処理とか必要だけどまた今度(・o・)ゞ
参考: pumaのThreadPool