Making concurrency simple, and a multi-threaded downloader in 4 (long) lines
You might not have noticed it, but every page on eigenclass.org lists the most popular referrers. I often find interesting things in the Referer field, but unfortunately they are hard to find (especially for an occasional visitor) in the middle of unaccessible pages (bloglines, google reader, other online RSS aggregators...) and (as of late) referrer spam.
I'm now filtering referrer URLs as I get them, but I also wanted to purge the historical data contained in the "referrer database". Unsurprisingly, I wrote a script for that.
Filtering referrers entails a fair bit of network traffic, to fetch the referring URLs and verify that they can be accessed and seem legitimate. Performing these checks serially would take forever (establishing the connection, issuing the HTTP request, waiting for the data, timeouts, ...) and I wouldn't be utilizing my bandwidth efficiently.
The obvious solution is performing several operations in parallel to maximize bandwidth usage.
Pooling handlers
The idea is creating a PoolingExecutor object that assigns tasks to a bounded number of handlers and runs them in separate threads. This way we optimize the use of some limited resource (in this case, bandwidth, but it could also be DB connections, etc...) --- since we're not CPU-bound, while avoiding an overload.
The API is:
executor = PoolingExecutor.new do |handlers| NUM_HANDLERS.times do handlers << SomeHandler.new(stuff) end end # later # each task is run in a different thread, but the num of simultaneous # threads is bounded executor.run do |handler| # perform task with the handler # e.g. foo( handler.process(stuff) ) end executor.run do |handler| # .... end executor.wait_for_all # ensure all the tasks scheduled with executor are # finished # ....
I've been using some old code of mine lately and it's also the case of PoolingExecutor: I wrote that code a few years ago. I remember I used it to download the whole ruby-talk archives from blade, and for other ad-hoc web-spiders and uploaders.
PoolingExecutor manages a pool of handlers, spawning new threads every time a task is scheduled, and assigning free handlers to the tasks in the waiting list. It relies on Thread, Mutex and ConditionVariable. The latter is used to waken a task in the waiting list once a handler becomes free for immediate use.
Here's the code:
require 'thread' class PoolingExecutor attr_reader :pool def initialize @mutex = Mutex.new @pool = [] @handleravail = ConditionVariable.new @threads = {} yield self if block_given? end def add_handler(handler) @pool << handler end alias_method :<<, :add_handler def run(logger = nil) handler = nil ref = [] @mutex.synchronize do if @pool.size == 0 @handleravail.wait @mutex until @pool.size > 0 end handler = @pool.shift @threads[handler] = ref # so it can be removed from @threads in due time logger and logger.info("Got handler #{handler.inspect}.") end ref << Thread.new do begin logger and logger.info("Yielding handler #{handler.inspect} for execution.") yield handler logger and logger.info("Finished execution with #{handler.inspect}.") ensure @mutex.synchronize do @pool << handler @threads.delete handler @handleravail.signal end end end ref[0] end def wait_for_all @threads.each do |handler, thread| begin thread[0].join rescue Exception end end @threads.clear end def num_tasks @threads.size end end if $0 == __FILE__ executor = PoolingExecutor.new do |handlers| 10.times do |i| handlers << lambda do |x| puts "<%2d>: #{x}" % i sleep(5 * rand) end end end require 'logger' logger = Logger.new(STDOUT) 100.times{|i| executor.run(logger){|handler| handler[i] } } executor.wait_for_all end
Walkthrough
PoolingExecutor#run is of course the most interesting part.
@pool is an array holding free handlers (i.e. ready to be yielded to the block passed to #run), and @threads contains the tasks in execution.
When a new tasks is scheduled, the main thread is blocked until a handler is available. This is done with the @handleravail condition variable. The call is
@handleravail.wait(@mutex)
in order to stop the thread and release @mutex so that other threads (those executing tasks) can enter the second @mutex.synchronize section, and waken the main thread with @handleavail.signal. Before that happens, the child thread would release its handler, adding it to the @pool, and remove itself from @threads:
@pool << handler @threads.delete handler
The close-up
There's only one tricky thing going on in there... Did you notice this code?
def run(logger = nil) handler = nil ref = [] @mutex.synchronize do # ... handler = @pool.shift @threads[handler] = ref # so it can be removed from @threads in due time # ... end ref << Thread.new do # ... end end
One could think that something like this would suffice:
def run(logger = nil) handler = nil @mutex.synchronize do if @pool.size == 0 @handleravail.wait @mutex until @pool.size > 0 end handler = @pool.shift end thread = Thread.new do begin yield handler ensure @mutex.synchronize do @pool << handler @threads.delete handler @handleravail.signal end end end @threads[thread] = thread # Hash for O(1) lookup end
But if the task were completed quickly enough,
@threads.delete handler
(in the child thread) would be executed before
@threads[thread] = thread
in the main thread, leaving a dead thread in @threads: the associated resources wouldn't be reclaimed until #wait_for_all (or #join) gets called.
A simple example
This is a way to do multi-threaded download with PoolingExecutor:
require 'open-uri' urls = %w[http://whatever.com/file.txt http://blerg.org/bar] + (150000..155000).map{|i| "http://foobar.ac.jp/cgi-bin/blergh/#{i}"} NUM_THREADS = 10 executor = PoolingExecutor.new{|h| NUM_THREADS.times{ h << Object.new } } urls.each do |url| executor.run do |handler| # we don't actually need handler; it's just a way to ensure there are at # most NUM_THREADS tasks active at a time open(url) do |is| File.open(File.basename(url), "w"){|f| f.write is.read(10240) until is.eof? } end end end executor.wait_for_all
In this case the "handlers" don't do anything, but in general they could carry some state (say, a connection). The code could have been condensed into 4 single-statement lines.
Further work
The PoolingExecutor does ensure that tasks are run in the same order they're scheduled (due to the way ConditionVariable is implemented), but it'd be interesting to be able to set priorities (e.g. by specifying them in the call to #run)...
What about filtering HTTP Referer fields?
Isn't the above enough code for today? (and this posting long enough for a blog? :)
Keyword(s):[blog] [ruby] [thread] [concurrency] [poolingexecutor] [snippet] [frontpage]
References:[Ruby] [Purging referrer URLs concurrently]