Synchronizing ruby threads with a shared queue
Whilst working on improving Poodle’s crawler I ran into a nasty problem. I had multiple threads all producing and consuming content from a shared “queue”.
Note: This is a slightly weird case of producer and consumer in one. Essentially you can’t stop processing until all the input has been consumed and processed (as the last item might generate more content for processing).
The following is my solution:
class ThreadPool
attr_reader :requested, :processed
def initialize(number_threads, function)
raise "A thread count of #{number_threads} is less than one" if number_threads < 1
raise 'Function is nil!' if function.nil?
@function = function
@in = Queue.new
@out = Queue.new
@requested = 0
@processed = 0
@lock = Mutex.new
(0...number_threads).each { Thread.new { run } }
end
def wait
item = @out.pop
@lock.synchronize { @processed += 1 }
block_given? ? (yield item) : item
end
def run
begin
loop do @out.push(@function.call(@in.pop)) end
rescue
print 'An error occurred: ',$!, "\n"
end
end
def process(item)
@lock.synchronize { @requested += 1 }
@in.push(item)
end
end
The loop becomes:
max_threads = 1
# Note: For real code this should be thread safe (i.e. use a mutex)
f = lambda{|x| ... } # Returns nil => nothing processed
thread_pool = ThreadPool.new(max_threads, f)
thread_pool.process('hello')
loop do
items = thread_pool.wait
items.each {|item| thread_pool.process(item) } if items
break if (thread_pool.requested == thread_pool.processed)
end