kotp
4/3/2013 - 5:08 AM

0-README.md

require 'thread'

class TwoLockLinkedQueue
  # This Node doesn't need atomic updates, it assumes
  # that you're modifying it while holding a lock
  Node = Struct.new(:item, :successor)

  def initialize
    dummy_node = Node.new(:dummy, nil)

    @head_node = dummy_node
    @tail_node = dummy_node

    @head_lock = Mutex.new
    @tail_lock = Mutex.new
  end

  def push(item)
    # allocate a new node with the item embedded
    new_node = Node.new(item, nil)

    @tail_lock.synchronize do
      # update the successor of the current tail to point to the new node
      @tail_node.successor = new_node
      @tail_node = new_node
    end
  end

  def pop
    @head_lock.synchronize do
      dummy_node = @head_node
      head = @head_node.successor

      if head.nil? # then queue was empty
        return nil
      else
        # the current head becomes the new 'dummy' head
        @head_node = head
        # return its value
        return head.item
      end
    end
  end

  def size
    successor = @head_node.successor
    count = 0

    loop do
      break if successor.nil?

      current_node = successor
      successor = current_node.successor
      count += 1
    end

    count
  end
end
require 'benchmark'
require 'thread'
require_relative 'atomic_linked_queue'
require_relative 'two_lock_linked_queue'

thread_count = 50

queue_klasses = [Queue, TwoLockLinkedQueue, AtomicLinkedQueue]
Thread.abort_on_exception = true

# this one tells all the threads when to start
$go = false

def setup(queue, thread_count, queue_length)
  tg = ThreadGroup.new

  if queue.class == Queue
    thread_count.times do
      t = Thread.new do
        # wait until the bm starts to do the work. This should
        # minimize variance.
        nil until $go
        (queue_length / thread_count).to_i.times do
          queue.push('item')
        end

        loop do
          begin
            result = queue.pop(:nonblock)
          rescue => e
            break
          end
        end
      end

      tg.add(t)
    end

  else
    thread_count.times do
      t = Thread.new do
        nil until $go
        (queue_length / thread_count).to_i.times do
          queue.push('item')
        end

        result = true
        until result.nil?
          result = queue.pop
        end
      end

      tg.add(t)
    end
  end

  GC.start
  tg
end

def exercise(tg)
  $go = true
  tg.list.each(&:join)
  $go = false
end

3.times do
  queue_klasses.each do |klass|
    Benchmark.bm(45) do |bm|
      queue = klass.new
      tg = setup(queue, thread_count, 10_000)
      bm.report("#{klass} - fill, then empty - 10k") { exercise(tg) }
      raise 'hell' unless queue.size.zero?
      
      queue = klass.new
      tg = setup(queue, thread_count, 100_000)
      bm.report("#{klass} - fill, then empty - 100k") { exercise(tg) }
      raise 'hell' unless queue.size.zero?

      queue = klass.new
      tg = setup(queue, thread_count, 1_000_000)
      bm.report("#{klass} - fill, then empty - 1mm") { exercise(tg) }
      raise 'hell' unless queue.size.zero?
    end
  end
end

require 'benchmark'
require 'thread'
require_relative 'atomic_linked_queue'
require_relative 'two_lock_linked_queue'

thread_count = 50
iterations = 10_000

queue_klasses = [Queue, TwoLockLinkedQueue, AtomicLinkedQueue]
Thread.abort_on_exception = true

# this one tells all the threads when to start
$go = false

def setup(queue, writer_thread_count, reader_thread_count, iterations)
  tg = ThreadGroup.new
  
  # spawn writer threads
  writer_thread_count.times do
    t = Thread.new do
      # wait until the bm starts to do the work. This should
      # minimize variance.
      nil until $go
      iterations.times do
        queue.push('item')
      end
    end

    tg.add(t)
  end
  
  # spawn reader threads
  if queue.class == Queue
    # the Queue class gets special behaviour because its #pop
    # method is blocking by default.
    reader_thread_count.times do
      t = Thread.new do
        nil until $go
        iterations.times do
          begin
            queue.pop(:nonblocking)
          rescue
          end
        end
      end

      tg.add(t)
    end
  else
    reader_thread_count.times do
      t = Thread.new do
        nil until $go
        iterations.times do
          queue.pop
        end
      end

      tg.add(t)
    end
  end
  
  tg
end

def exercise(tg)
  $go = true
  tg.list.each(&:join)
  $go = false
end

3.times do
  queue_klasses.each do |klass|
    Benchmark.bm(50) do |bm|
      queue = klass.new
      tg = setup(queue, thread_count, (thread_count * 0.6).to_i, iterations)
      bm.report("#{klass} - more writers than readers") { exercise(tg) }

      queue = klass.new
      tg = setup(queue, (thread_count * 0.6).to_i, thread_count, iterations)
      bm.report("#{klass} - more readers than writers") { exercise(tg) }

      queue = klass.new
      tg = setup(queue, thread_count, thread_count, iterations)
      bm.report("#{klass} - equal writers and readers") { exercise(tg) }
    end
  end
end

# This is a proof-of-concept of a concurrent, lock-free FIFO data 
# structure written in Ruby. It leverages atomic updates, rather than
# lock-based synchronization.

require 'atomic'

class AtomicLinkedQueue
  class Node
    attr_accessor :item

    def initialize(item, successor)
      @item = item
      @successor = Atomic.new(successor)
    end

    def successor
      @successor.value
    end

    def update_successor(old, new)
      @successor.compare_and_set(old, new)
    end
  end

  def initialize
    dummy_node = Node.new(:dummy, nil)

    @head = Atomic.new(dummy_node)
    @tail = Atomic.new(dummy_node)
  end

  def push(item)
    # allocate a new node with the item embedded
    new_node = Node.new(item, nil)

    # keep trying until the operation succeeds
    loop do
      current_tail_node = @tail.value
      current_tail_successor = current_tail_node.successor

      # if our stored tail is still the current tail
      if current_tail_node == @tail.value
        # if that tail was really the last node
        if current_tail_successor.nil?
          # if we can update the previous successor of tail to point to this new node
          if current_tail_node.update_successor(nil, new_node)
            # then update tail to point to this node as well
            @tail.compare_and_set(current_tail_node, new_node)
            # and return
            return true
            # else, start the loop over
          end
        else
          # in this case, the tail ref we had wasn't the real tail
          # so we try to set its successor as the real tail, then start the loop again
          @tail.compare_and_set(current_tail_node, current_tail_successor)
        end
      end
    end
  end

  def pop
    # retry until some value can be returned
    loop do
      # the value in @head is just a dummy node that always sits in that position,
      # the real 'head' is in its successor
      current_dummy_node = @head.value
      current_tail_node = @tail.value

      current_head_node = current_dummy_node.successor

      # if our local head is still consistent with the head node, continue
      # otherwise, start over
      if current_dummy_node == @head.value
        # if either the queue is empty, or falling behind
        if current_dummy_node == current_tail_node
          # if there's nothing after the 'dummy' head node
          if current_head_node.nil?
            # just return nil
            return nil
          else
            # here the head element succeeding head is not nil, but the head and tail are equal
            # so tail is falling behind, update it, then start over
            @tail.compare_and_set(current_tail_node, current_head_node)
          end
        # the queue isn't empty
        # if we can set the dummy head to the 'real' head, we're free to return the value in that real head, success
        elsif @head.compare_and_set(current_dummy_node, current_head_node)
          # grab the item from the popped node
          item = current_head_node.item

          if item != nil
            current_head_node.item = nil
          end
          
          # return it, success!
          return item

        # else
          # try again
        end
      end
    end
  end

  def size
    successor = @head.value.successor
    count = 0

    loop do
      break if successor.nil?

      current_node = successor
      successor = current_node.successor
      count += 1
    end

    count
  end
end

This is a proof-of-concept of a couple of concurrent data structures written in Ruby.

The implementations are heavily commented for those interested. There are benchmarks (with results) included below. The results are interesting, but, as always, take with a grain of salt.

Data structures

AtomicLinkedQueue is a lock-free queue, built on atomic CAS operations. It doesn't use any mutexes.

TwoLockLinkedQueue is a lock-based queue, but with separate locks for the head + tail. This means there can be lots of contention when pushing to the queue, but little when it comes to popping off the queue.

Both of these implementations are unbounded queues, with no blocking operations.

See the individual files below for more about their implementation.

Background

For those unfamiliar with the atomic compare-and-swap operation, here's a brief outline of how it operates.

Create an Atomic instance that holds a value. (This comes from the atomic rubygem).

item = Atomic.new('value')

The workhorse method is #compare_and_set(old, new). You pass it the value that you think it currently holds, and the value you want to set it to.

If it does hold the expected value, it's set to your new value. Otherwise, it returns false. In this implementation, when that happens, the operation is re-started, over and over, until it succeeds.

This compare-and-set operation is hardware-supported and works across Ruby implementations thanks to the 'atomic' gem. The hardware support provides the atomic guarantee. Without this guarantee, it would be possible to read a value, then have another thread update the value before you can, then your thread would overwrite that value. This stuff is complicated.

Due credit

I can't take credit for this implementation. This is an implementation of the pseudo-code from "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms"[1], with some hints from Java's implementation of java.util.concurrent.ConcurrentLinkedQueue[2].

  1. http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
  2. http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/ConcurrentLinkedQueue.java

Benchmark results

These are the results of running the included benchmarks against Rubinius 2.0.0-rc1 and JRuby 1.7.3. Any results from MRI will be superficial due to the global lock, so I omitted it completely.

All of the benchmarks were run against Queue (a fully synchronized queue from Ruby's std lib), TwoLockLinkedQueue (implementation below using two different locks), and AtomicLinkedQueue (the lock-free implementation).

The benchmark_concurrent_reads+writes.rb benchmark allocates some threads to write to the queue, and others to read. So there's always contention for both pushing and popping.

The benchmark_separate_reads+writes.rb benchmark first focuses on filling up the queue to capacity, then emptying it completely. This focuses all of the contention on writing, then all of it on reading.

Rubinius results

$ rbx benchmark_concurrent_reads+writes.rb
                                                        user     system      total        real
Queue - more writers than readers                   4.178510   4.892144   9.070654 (  6.677201)
Queue - more readers than writers                   5.427958   4.914869  10.342827 (  7.545760)
Queue - equal writers and readers                   5.313148   6.802285  12.115433 (  8.720809)
                                                        user     system      total        real
TwoLockLinkedQueue - more writers than readers      5.151256   7.610410  12.761666 (  6.458769)
TwoLockLinkedQueue - more readers than writers      5.395152   8.326897  13.722049 (  6.568123)
TwoLockLinkedQueue - equal writers and readers      6.641767  10.623600  17.265367 (  7.297145)
                                                        user     system      total        real
AtomicLinkedQueue - more writers than readers       2.897964   0.061956   2.959920 (  0.717638)
AtomicLinkedQueue - more readers than writers       2.814892   0.050590   2.865482 (  0.596547)
AtomicLinkedQueue - equal writers and readers       4.175097   0.086688   4.261785 (  0.891113)
                                                        user     system      total        real

$ rbx benchmark_separate_reads+writes.rb
                                                   user     system      total        real
Queue - fill, then empty - 10k                 0.113160   0.120949   0.234109 (  0.159911)
Queue - fill, then empty - 100k                1.035808   1.174422   2.210230 (  1.596514)
Queue - fill, then empty - 1mm                11.258097  12.224407  23.482504 ( 17.325185)
                                                   user     system      total        real
TwoLockLinkedQueue - fill, then empty - 10k    0.139143   0.172324   0.311467 (  0.214725)
TwoLockLinkedQueue - fill, then empty - 100k   1.312984   1.671349   2.984333 (  2.233421)
TwoLockLinkedQueue - fill, then empty - 1mm   12.175179  16.069279  28.244458 ( 22.541654)
                                                   user     system      total        real
AtomicLinkedQueue - fill, then empty - 10k     0.071836   0.003300   0.075136 (  0.009811)
AtomicLinkedQueue - fill, then empty - 100k    0.645546   0.011743   0.657289 (  0.147805)
AtomicLinkedQueue - fill, then empty - 1mm     7.075495   0.108397   7.183892 (  1.663006)

JRuby results

$ jruby benchmark_concurrent_reads+writes.rb
                                                         user     system      total        real
Queue - more writers than readers                    0.224000   0.000000   0.224000 (  0.224000)
Queue - more readers than writers                    8.529000   0.000000   8.529000 (  8.529000)
Queue - equal writers and readers                    0.263000   0.000000   0.263000 (  0.262000)
                                                         user     system      total        real
TwoLockLinkedQueue - more writers than readers       1.492000   0.000000   1.492000 (  1.492000)
TwoLockLinkedQueue - more readers than writers       1.788000   0.000000   1.788000 (  1.788000)
TwoLockLinkedQueue - equal writers and readers       2.205000   0.000000   2.205000 (  2.205000)
                                                         user     system      total        real
AtomicLinkedQueue - more writers than readers        1.086000   0.000000   1.086000 (  1.086000)
AtomicLinkedQueue - more readers than writers        0.571000   0.000000   0.571000 (  0.572000)
AtomicLinkedQueue - equal writers and readers        1.049000   0.000000   1.049000 (  1.049000)

$ jruby benchmark_separate_reads+writes.rb
                                                    user     system      total        real
Queue - fill, then empty - 10k                  0.014000   0.000000   0.014000 (  0.014000)
Queue - fill, then empty - 100k                 0.065000   0.000000   0.065000 (  0.065000)
Queue - fill, then empty - 1mm                  0.744000   0.000000   0.744000 (  0.744000)
                                                    user     system      total        real
TwoLockLinkedQueue - fill, then empty - 10k     0.032000   0.000000   0.032000 (  0.032000)
TwoLockLinkedQueue - fill, then empty - 100k    0.337000   0.000000   0.337000 (  0.337000)
TwoLockLinkedQueue - fill, then empty - 1mm     4.640000   0.000000   4.640000 (  4.640000)
                                                    user     system      total        real
AtomicLinkedQueue - fill, then empty - 10k      0.016000   0.000000   0.016000 (  0.016000)
AtomicLinkedQueue - fill, then empty - 100k     0.162000   0.000000   0.162000 (  0.162000)
AtomicLinkedQueue - fill, then empty - 1mm      2.706000   0.000000   2.706000 (  2.706000)