Whilst working on improving Poodle’s crawler I ran into a nasty problem. I had multiple threads all producing and consuming content from a shared “queue”.

Note: This is a slightly weird case of producer and consumer in one. Essentially you can’t stop processing until all the input has been consumed and processed (as the last item might generate more content for processing).

The following is my solution:

class ThreadPool

  attr_reader :requested, :processed

  def initialize(number_threads, function)
    raise "A thread count of #{number_threads} is less than one" if number_threads < 1
    raise 'Function is nil!' if  function.nil?
    @function = function
    @in = Queue.new
    @out = Queue.new
    @requested = 0
    @processed = 0
    @lock = Mutex.new
    (0...number_threads).each { Thread.new { run } }
  end

  def wait
    item = @out.pop
    @lock.synchronize { @processed += 1 }
    block_given? ? (yield item) : item
  end

  def run
    begin
      loop do @out.push(@function.call(@in.pop)) end
    rescue
      print 'An error occurred: ',$!, "\n"
    end
  end

  def process(item)
    @lock.synchronize { @requested += 1 }
    @in.push(item)
  end
end

The loop becomes:

max_threads = 1
# Note: For real code this should be thread safe (i.e. use a mutex)
f = lambda{|x| ... } # Returns nil => nothing processed

thread_pool = ThreadPool.new(max_threads, f)
thread_pool.process('hello')

loop do
  items = thread_pool.wait
  items.each {|item| thread_pool.process(item) } if items
  break if (thread_pool.requested == thread_pool.processed)
end