読者です 読者をやめる 読者になる 読者になる

Ruby の Monitor と ConditionVariable の使い方

Ruby の Monitor と ConditionVariable の使い方

なかなかぐぐっても日本語の資料が見つからなかったので自分で動かしてみた。(ぐぐる能力低い)

まずThreadの直列化

require "thread"
require "monitor"

moni = Monitor.new

val = 0

Thread.new {
  3.times {
    puts "thread1 start: #{val}"
    val+=1
    sleep 0.1
    puts "thread1 end: #{val}"
  }
}

Thread.new {
  3.times {
    puts "thread2 start: #{val}"
    val+=1
    sleep 0.1
    puts "thread2 end: #{val}"
  }
}

sleep

結果は以下の通り。
start > end, start > end と一つ一つ処理してほしいのに途中でまざってしまってる。

thread1 start: 0
thread1 end: 1
thread1 start: 1
# もうココでthread2が割り込んでしまっている。。
thread2 start: 1
thread2 end: 3thread1 end: 3
thread1 start: 3

thread2 start: 4
thread1 end: 5
thread2 end: 5
thread2 start: 5
thread2 end: 6

synchronizeをつけて再度実行

require "thread"
require "monitor"

moni = Monitor.new

val = 0

Thread.new {
  moni.synchronize {
    3.times {
      puts "thread1 start: #{val}"
      val+=1
      sleep 0.1
      puts "thread1 end: #{val}"
    }
  }
}

Thread.new {
  sleep 0.1
  moni.synchronize {
    3.times {
      puts "thread2 start: #{val}"
      val+=1
      sleep 0.1
      puts "thread2 end: #{val}"
    }
  }
}

sleep
thread1 start: 0
thread1 end: 1
thread1 start: 1
thread1 end: 2
thread1 start: 2
thread1 end: 3
thread2 start: 3
thread2 end: 4
thread2 start: 4
thread2 end: 5
thread2 start: 5
thread2 end: 6

うまくいったヾ(@^▽^@)ノ

Thread同士のまちあわせ。

Thread同士の処理を待ち合わせるのにループでポールしてもいいんだけど、そういうプログラムって往々にして扱いにくい(気がする)のでモダンなやり方を調べてみた。

require "thread"
require "monitor"

moni = Monitor.new
# thread同士の待ち合わせには ConditionVariableというのを使う
# Monitor#new_cond でそのMonitorにひもづいている 
# ConditionVariable のインスタンスを取得する事ができる
cond = moni.new_cond

thread1 = Thread.new {
  moni.synchronize {
    # ConditionVariable#waitで待ち合わせる。
    # 必ずロックを取得した thread でしか wait は呼べない。
    # つまり以下の二つの状態でした wait  は呼べない
    #   ・Monitor#enter でロック取得した以降 Monitor#exit でロックを解放するまで
    #   ・Monitor#synchronize ブロックの中。(このサンプルだとこっち)
    # 
    # waitを呼ばれた thread は一度 lock を解放し thread を sleep させ
    # 他の Thread に処理を移す。
    cond.wait
    
    # 起こされたここから処理が再開する。
    puts "receive"
  }
}

thread2 = Thread.new {
  sleep 1
  moni.synchronize {
    # condで待ってる他のthreadを起こす。
    cond.signal
  }
}

thread1.join; thread2.join

上記の応用として ThreadPoolをつくってみた

class ThreadPool
  # 渡されたブロックをThreadで処理する
  def initialize(size=5, &block)
    @threads = [] # Threadの格納しておく配列
    @jobs    = [] # blockに渡す引数を格納し、Queueの役割をする。
    @monit   = Monitor.new
    @cond    = @monit.new_cond
    @block   = block # 実行される処理
    @size    = size # 最大Thread数
  end

  def run
    @size.times do
      spawn_thread
    end
  end

  def run_thread
    # synchronize抜けてから処理を実行しないと
    # Threadを使った並列処理ができない。
    # @jobsは全てのthreadで共通のスコープの変数なので
    # synchronizeの外で触るとくちゃくちゃになるので
    # いったんThread固有のローカル変数に格納して
    # このローカル変数を引数にスレッドブロックを呼ぶ。
    job = nil
    loop do
      @monit.synchronize do
        # signalを受け取るたびに block を評価して
        # 真になれば続きの処理を行い、偽であれば再度スリープする。
        @cond.wait_until { @jobs.size > 0 }
        # 真になったthreadのみココにくる
        job = @jobs.pop
      end

      @block.call(job)
    end
  end

  def spawn_thread
    @monit.synchronize do
      @threads << Thread.new(&method(:run_thread))
    end
  end

  def <<(job)
    @monit.synchronize do
      # jobにキューを格納しThreadを一つ起こす
      @jobs << job
      @cond.signal
    end
  end
end

# Threadを5つ作り、スレッドが実行するブロックを渡す。
pool = ThreadPool.new(5) do |arg|
  p arg
  # heavy process
  sleep 3
end

# Threadを準備
pool.run

# 10回キューする
10.times do
  pool << "aa"
end

sleep

あとは、Thread止める処理とか必要だけどまた今度(・o・)ゞ

参考: pumaのThreadPool