eigenclass logo
MAIN  Index  Search  Changes  PageRank  Login

Making concurrency simple, and a multi-threaded downloader in 4 (long) lines

You might not have noticed it, but every page on eigenclass.org lists the most popular referrers. I often find interesting things in the Referer field, but unfortunately they are hard to find (especially for an occasional visitor) in the middle of unaccessible pages (bloglines, google reader, other online RSS aggregators...) and (as of late) referrer spam.

I'm now filtering referrer URLs as I get them, but I also wanted to purge the historical data contained in the "referrer database". Unsurprisingly, I wrote a script for that.

Filtering referrers entails a fair bit of network traffic, to fetch the referring URLs and verify that they can be accessed and seem legitimate. Performing these checks serially would take forever (establishing the connection, issuing the HTTP request, waiting for the data, timeouts, ...) and I wouldn't be utilizing my bandwidth efficiently.

The obvious solution is performing several operations in parallel to maximize bandwidth usage.

Pooling handlers

The idea is creating a PoolingExecutor object that assigns tasks to a bounded number of handlers and runs them in separate threads. This way we optimize the use of some limited resource (in this case, bandwidth, but it could also be DB connections, etc...) --- since we're not CPU-bound, while avoiding an overload.

The API is:

executor = PoolingExecutor.new do |handlers|
  NUM_HANDLERS.times do 
    handlers << SomeHandler.new(stuff)
  end
end

# later

# each task is run in a different thread, but the num of simultaneous
# threads is bounded
executor.run do |handler|
  # perform task with the handler
  # e.g.
  foo( handler.process(stuff) )
end
executor.run do |handler|
  # ....
end
executor.wait_for_all # ensure all the tasks scheduled with executor are
                      # finished
# ....

I've been using some old code of mine lately and it's also the case of PoolingExecutor: I wrote that code a few years ago. I remember I used it to download the whole ruby-talk archives from blade, and for other ad-hoc web-spiders and uploaders.

PoolingExecutor manages a pool of handlers, spawning new threads every time a task is scheduled, and assigning free handlers to the tasks in the waiting list. It relies on Thread, Mutex and ConditionVariable. The latter is used to waken a task in the waiting list once a handler becomes free for immediate use.

Here's the code:

require 'thread'

class PoolingExecutor
  attr_reader :pool

  def initialize
    @mutex = Mutex.new
    @pool = []
    @handleravail = ConditionVariable.new
    @threads = {}
    yield self if block_given?
  end

  def add_handler(handler)
    @pool << handler 
  end

  alias_method :<<, :add_handler

  def run(logger = nil)
    handler = nil
    ref = []
    @mutex.synchronize do
      if @pool.size == 0
        @handleravail.wait @mutex until @pool.size > 0
      end
      handler = @pool.shift
      @threads[handler] = ref # so it can be removed from @threads in due time
      logger and logger.info("Got handler #{handler.inspect}.")
    end
    ref << Thread.new do
      begin
        logger and logger.info("Yielding handler #{handler.inspect} for execution.")
        yield handler
        logger and logger.info("Finished execution with #{handler.inspect}.")
      ensure
        @mutex.synchronize do
          @pool << handler
          @threads.delete handler
          @handleravail.signal
        end
      end
    end
    ref[0]
  end

  def wait_for_all
    @threads.each do |handler, thread|
      begin
        thread[0].join 
      rescue Exception
      end
    end
    @threads.clear
  end

  def num_tasks
    @threads.size
  end
end


if $0 == __FILE__
  executor = PoolingExecutor.new do |handlers|
    10.times do |i|
      handlers << lambda do |x|
        puts "<%2d>: #{x}" % i
        sleep(5 * rand)
      end
    end
  end
  require 'logger'
  logger = Logger.new(STDOUT)
  100.times{|i| executor.run(logger){|handler| handler[i] } }
  executor.wait_for_all
end

Walkthrough

PoolingExecutor#run is of course the most interesting part.

@pool is an array holding free handlers (i.e. ready to be yielded to the block passed to #run), and @threads contains the tasks in execution.

When a new tasks is scheduled, the main thread is blocked until a handler is available. This is done with the @handleravail condition variable. The call is

 @handleravail.wait(@mutex)

in order to stop the thread and release @mutex so that other threads (those executing tasks) can enter the second @mutex.synchronize section, and waken the main thread with @handleavail.signal. Before that happens, the child thread would release its handler, adding it to the @pool, and remove itself from @threads:

 @pool << handler
 @threads.delete handler

The close-up

There's only one tricky thing going on in there... Did you notice this code?

  def run(logger = nil)
    handler = nil
    ref = []
    @mutex.synchronize do
      # ...
      handler = @pool.shift
      @threads[handler] = ref # so it can be removed from @threads in due time
      # ...
    end
    ref << Thread.new do
      # ...
    end
  end

One could think that something like this would suffice:

  def run(logger = nil)
    handler = nil
    @mutex.synchronize do
      if @pool.size == 0
        @handleravail.wait @mutex until @pool.size > 0
      end
      handler = @pool.shift
    end
    thread = Thread.new do
      begin
        yield handler
      ensure
        @mutex.synchronize do
          @pool << handler
          @threads.delete handler
          @handleravail.signal
        end
      end
    end
    @threads[thread] = thread  # Hash for O(1) lookup
  end

But if the task were completed quickly enough,

 @threads.delete handler

(in the child thread) would be executed before

 @threads[thread] = thread 

in the main thread, leaving a dead thread in @threads: the associated resources wouldn't be reclaimed until #wait_for_all (or #join) gets called.

A simple example

This is a way to do multi-threaded download with PoolingExecutor:

require 'open-uri'
urls = %w[http://whatever.com/file.txt http://blerg.org/bar] +
       (150000..155000).map{|i| "http://foobar.ac.jp/cgi-bin/blergh/#{i}"}

NUM_THREADS = 10
executor = PoolingExecutor.new{|h| NUM_THREADS.times{ h << Object.new } }
urls.each do |url|
  executor.run do |handler|
    # we don't actually need handler; it's just a way to ensure there are at
    # most NUM_THREADS tasks active at a time
    open(url) do |is|
      File.open(File.basename(url), "w"){|f| f.write is.read(10240) until is.eof? }
    end
  end
end

executor.wait_for_all

In this case the "handlers" don't do anything, but in general they could carry some state (say, a connection). The code could have been condensed into 4 single-statement lines.

Further work

The PoolingExecutor does ensure that tasks are run in the same order they're scheduled (due to the way ConditionVariable is implemented), but it'd be interesting to be able to set priorities (e.g. by specifying them in the call to #run)...

What about filtering HTTP Referer fields?

Isn't the above enough code for today? (and this posting long enough for a blog? :)



Last modified:2006/04/14 09:28:15
Keyword(s):[blog] [ruby] [thread] [concurrency] [poolingexecutor] [snippet] [frontpage]
References:[Ruby] [Purging referrer URLs concurrently]