Testing Concurrent Code With Ruby Fibers

A second time I stumble upon a situation where I have a method which takes a block, and runs that block in a certain context. This is the Ruby pattern we are all familiar “with” (pun intended):

with_database_connection do |conn|
  conn.batch_insert(records)
end

The most familiar usage of this pattern is, of course, with File objects - we enter a block with the opened file, and the open method then ensures that the file gets closed when we exit the block.

What I found to be tricky, though, is testing that this block creates a certain state and holds that state during the duration of the block. For example: at $company a number of our services use locks of some kind. Implementing locking can be done with “lock objects” or “lock tokens”, in which case your code would likely have a shape like this:

def do_massive_work(on_some_object)
  lock = acquire_lock(name: "some_object_#{on_some_object.to_param}")
  do_some_work
ensure
  lock.release
end

This works fine, and a lock like this is pretty easy to test. For example, with RSpec:

lock = acquire_lock(name: "lock1")
expect {
  acquire_lock(name: "lock1")
}.to raise_error(Locked)

lock.release
expect {
  acquire_lock(name: "lock1")
}.not_to raise_error(Locked)

But this kind of API is a bit… wordy. With these lovely with_... blocks used all over the place it shouldn’t really be necessary - introducing this API beneath the with_.. one feels like doubling your interface surface just for the sake of being able to test your interface.

The particular case I needed this was when I wanted to partake in the (quite neat) feature of MySQL called GET_LOCK() which gives you a user-level named lock token. It is pretty dandy, as you can do this:

SELECT GET_LOCK("object_graph_from_user_123", 2); -- Second argument is how long you are prepared to wait for the lock to become available
UPDATE comments WHERE author_id = 123 SET ...;
UPDATE uploaded_pictures WHERE uploader_id = 123 SET ...;
SELECT RELEASE_LOCK("object_graph_from_user_123");

If you have a task which manipulates an object graph, and you want to have only “one” of that task to run at the same time (networks are hard, and nothing in the world is perfect, but it does mostly work) the GET_LOCK() function is a pretty neat feature. ActiveRecord even uses this functionality to ensure that you cannot run multiple migrations at the same time, but those functions are not exposed as part of the public API. Imagine we want a module which provides a with method which would work like this:

DBLock.with("lock_1") do
  #... do something while holding the lock
end

Writing the module is not hard at all:

  def self.with(lock_name, timeout_seconds: 5)
    qname = ActiveRecord::Base.connection.quote(lock_name) # Note there is a limit of 64 bytes on the lock name
    did_acquire = ActiveRecord::Base.connection.select_value("SELECT GET_LOCK(%s, %d)" % [qname, timeout_seconds.to_i])
    raise Unavailable, "Unable to acquire lock #{lock_name.inspect} after #{timeout_seconds}, MySQL returned #{did_acquire}" unless did_acquire == 1

    yield
  ensure
    ActiveRecord::Base.connection.select_value("SELECT RELEASE_LOCK(%s)" % qname) if did_acquire == 1
  end

And now the fun part: how do we test it? How do we verify that a lock can only be held by one thread or process?


Way back in 2008 I suggested using artisanal sleep() calls to make concurrent calls happen, but it has several problems:

Luckily, in our modern Ruby world, there is a much better alternative and that is using Fibers. I’ve covered some Fiber properties in an article I wrote for Appsignal and you could read up there if you want to refresh your memory about how Fiber works. What’s important for us in this case is that Fiber contains within itself a pause button - the Fiber.yield method. Yielding from a fiber means that you literally tell your code to “drop dead” and suspend itself, until you imperatively resume the code with Fiber#resume - this one is an instance method on the Fiber itself. In essence, what people want to say when they posit that “Fibers are for concurrency” could better be worded differently: “Fibers are for sequencing” - they allow you fine control over in which order code is going to execute.


Now, one might wonder: this is a strange contraption and how exactly is that useful for testing our lock?

Here is how it is useful. We will suspend our code while the lock is held. We will enter the section and obtain the lock in one Fiber which we will then suspend. When that Fiber yields to us we know that it is holding the lock within itself, and it won’t release the lock it is holding until we #resume it again. Fibers are “callback-like” and when dealing with this kind of code - for me - it is really really hard to visualise mentally what is going on. What I found helps a ton is just putting the “execution line numbers” next to your code when trying to understand the flow. So:

fiber = Fiber.new do
  acquire_lock
  Fiber.yield
  release_lock
end

fiber.resume
fiber.resume

will be executing roughly in this order:

   fiber = Fiber.new do
2)   acquire_lock
3)   Fiber.yield
5)   release_lock
   end
 
1) fiber.resume 
4) fiber.resume

When we do the first #resume we are entering the fiber, and acquiring the lock. Then the Fiber will yield back to us and suspend itself. Then we #resume it again and the Fiber will execute the remaining code within itself, to completion. What applies to method bodies also applies to locks, and crucially - our almighty pause button which is the Fiber.yield call - is going to execute in the block as well. So we can place our block into the Fiber:

fiber = Fiber.new do
  DBLock.with("lock_1") do # This is the same as our line 2)
    Fiber.yield # Pause at line 3) with the lock still held by us
    # Running the block to completion means that the
    # `ensure` of `with` will be called and it releases the lock
  end
end

fiber.resume # Bam! Now the lock is held within the suspended Fiber
expect {
  DBLock.with_lock("lock_1") do
    raise "Should never be reached"
  end
}.to raise_error(DBLock::Unavailable)

That gets us rid of all of the issues of our previous solution with sleep():

In this instance we need to ask for a 0-second timeout for things to be near-instant, but the gist still holds.


With the particular module I am describing there was a catch of course (isn’t there always): by default this is not going to work 😢

For some reason our test.. still fails. And there is a reason for it. The reason is that these locks in MySQL are reentrant within a connection, meaning you can obtain them multiple times:

SELECT GET_LOCK("lock_1", 1); -- returns 1
SELECT GET_LOCK("lock_1", 1); -- returns 1 too!
SELECT RELEASE_LOCK("lock_1"); -- will return 1
SELECT RELEASE_LOCK("lock_1"); -- will return 1
SELECT RELEASE_LOCK("lock_1"); -- will return NULL

This is useful - it means that within a single SQL session you won’t deadlock with yourself. It also means that you have to RELEASE_LOCK as many times as you have done GET_LOCK since this locking system has a checkout counter, but that being said: for our testing this is a negative, because by default ActiveRecord::Base.connection will give you the connection currently checked out for use by the current active thread. Meaning - both your assertion and your Fiber will be using the same connection, meaning that they will be able to grab two instances of the lock. You will only be refused on the lock if you try to obtain it from a different MySQL connection, which - in ActiveRecord terms - must be another connection object. And there is a place to get that from - it is the connection pool:

ActiveRecord::Base.connection_pool.with do
  DBLock.with_lock("lock_1") do
    # ... do our locky stuff
  end
end

Except… that won’t work either, because within our Fiber we still are inside of the same Thread, and the ActiveRecord::Base.connection is still the same object! So even if we change our test to do this:

fiber1 = Fiber.new do
  ActiveRecord::Base.connection_pool.with do
    DBLock.with("lock_1") do
      Fiber.yield
    end
  end
end

fiber2 = Fiber.new do
  ActiveRecord::Base.connection_pool.with do
    DBLock.with("lock_1") do
      Fiber.yield
    end
  end
end

fiber1.resume
expect {
  fiber2.resume
}.to raise_error(DBLock::Unavailable)

our test will still fail, since we are checking out a connection from the ActiveRecord pool but not acquiring our locks through it. And thus, begrudgingly, we change our with_lock to this:

def self.with(lock_name, timeout_seconds: 1, for_connection: ActiveRecord::Base.connection)
  qname = for_connection.quote(lock_name) # Note there is a limit of 64 bytes on the lock name
  did_acquire = for_connection.select_value("SELECT GET_LOCK(%s, %d)" % [qname, timeout_seconds.to_i])
  raise Unavailable, "Unable to acquire lock #{lock_name.inspect} after #{timeout_seconds}, MySQL returned #{did_acquire}" unless did_acquire == 1

  yield
ensure
  for_connection.select_value("SELECT RELEASE_LOCK(%s)" % qname) if did_acquire == 1
end

We are going to pass the connection to the method and have it acquire the lock through that particular connection:

fiber1 = Fiber.new do
  ActiveRecord::Base.connection_pool.with do |conn|
    DBLock.with("lock_1", for_connection: conn) do
      Fiber.yield
    end
  end
end

fiber2 = Fiber.new do
  ActiveRecord::Base.connection_pool.with do |conn|
    DBLock.with("lock_1", for_connection: conn) do
      Fiber.yield
    end
  end
end

fiber1.resume # Bam! Now the lock is held within the suspended Fiber
expect {
  fiber2.resume
}.to raise_error(DBLock::Unavailable)

and indeed, the test now passes!

Same trick can really be applied if you want to “suspend” inside of a different method. Imagine you want to have this locking facility within an object, and you place it in a Module that you prepend. Say, with a background job that you reeeally want to run only one of at the same time:

module OnlyOnce
  def perform(*args)
    with_lock("job_abc") do
      super
    end
  end
end

and you want to test that module. What do we do? Well, we can apply the same trick - except that we will need an object which does Fiber.yield from its perform. You can yield this way from anywhere, not only from a hand-written Fiber.new do... block.

job_mock_class = Class.new do
  prepend OnlyOnce
  def perform(*)
    Fiber.yield
  end
end

fiber1 = Fiber.new do
  job_mock_class.new.perform # Our module takes the lock and calls super, and super suspends inside of itself
end
fiber1.resume # now we are holding the lock

This applies to any situation where you might want to test code which enters some block and must stay within that block while you make test assertions. Just think of the magic ⏯ and the rest will unwind by itself.