Testing Concurrent Code With Ruby Fibers
·A second time I stumble upon a situation where I have a method which takes a block, and runs that block in a certain context. This is the Ruby pattern we are all familiar “with” (pun intended):
with_database_connection do |conn|
conn.batch_insert(records)
end
The most familiar usage of this pattern is, of course, with File
objects - we enter a block with the opened file, and the open
method then ensure
s that the file gets closed when we exit the block.
What I found to be tricky, though, is testing that this block creates a certain state and holds that state during the duration of the block. For example: at $company a number of our services use locks of some kind. Implementing locking can be done with “lock objects” or “lock tokens”, in which case your code would likely have a shape like this:
def do_massive_work(on_some_object)
lock = acquire_lock(name: "some_object_#{on_some_object.to_param}")
do_some_work
ensure
lock.release
end
This works fine, and a lock like this is pretty easy to test. For example, with RSpec:
lock = acquire_lock(name: "lock1")
expect {
acquire_lock(name: "lock1")
}.to raise_error(Locked)
lock.release
expect {
acquire_lock(name: "lock1")
}.not_to raise_error(Locked)
But this kind of API is a bit… wordy. With these lovely with_...
blocks used all over the place it shouldn’t really be necessary - introducing this API beneath the with_..
one feels like doubling your interface surface just for the sake of being able to test your interface.
The particular case I needed this was when I wanted to partake in the (quite neat) feature of MySQL called GET_LOCK() which gives you a user-level named lock token. It is pretty dandy, as you can do this:
SELECT GET_LOCK("object_graph_from_user_123", 2); -- Second argument is how long you are prepared to wait for the lock to become available
UPDATE comments WHERE author_id = 123 SET ...;
UPDATE uploaded_pictures WHERE uploader_id = 123 SET ...;
SELECT RELEASE_LOCK("object_graph_from_user_123");
If you have a task which manipulates an object graph, and you want to have only “one” of that task to run at the same time (networks are hard, and nothing in the world is perfect, but it does mostly work) the GET_LOCK()
function is a pretty neat feature. ActiveRecord even uses this functionality to ensure that you cannot run multiple migrations at the same time, but those functions are not exposed as part of the public API. Imagine we want a module which provides a with
method which would work like this:
DBLock.with("lock_1") do
#... do something while holding the lock
end
Writing the module is not hard at all:
def self.with(lock_name, timeout_seconds: 5)
qname = ActiveRecord::Base.connection.quote(lock_name) # Note there is a limit of 64 bytes on the lock name
did_acquire = ActiveRecord::Base.connection.select_value("SELECT GET_LOCK(%s, %d)" % [qname, timeout_seconds.to_i])
raise Unavailable, "Unable to acquire lock #{lock_name.inspect} after #{timeout_seconds}, MySQL returned #{did_acquire}" unless did_acquire == 1
yield
ensure
ActiveRecord::Base.connection.select_value("SELECT RELEASE_LOCK(%s)" % qname) if did_acquire == 1
end
And now the fun part: how do we test it? How do we verify that a lock can only be held by one thread or process?
Way back in 2008 I suggested using artisanal sleep()
calls to make concurrent calls happen, but it has several problems:
- It is slow. Your test will be sitting there waiting in the
sleep()
and you will be sitting there waiting for your test to pass. Not ideal. - It only checks mutations - the outcomes of an operation, not a continuous interval of time when a certain state invariant is in force. Not ideal.
- It is not deterministic - it is hard to predict which of the threads will win.
Luckily, in our modern Ruby world, there is a much better alternative and that is using Fibers. I’ve covered some Fiber properties in an article I wrote for Appsignal and you could read up there if you want to refresh your memory about how Fiber
works. What’s important for us in this case is that Fiber
contains within itself a pause button - the Fiber.yield
method. Yielding from a fiber means that you literally tell your code to “drop dead” and suspend itself, until you imperatively resume the code with Fiber#resume
- this one is an instance method on the Fiber
itself. In essence, what people want to say when they posit that “Fibers are for concurrency” could better be worded differently: “Fibers are for sequencing” - they allow you fine control over in which order code is going to execute.
Now, one might wonder: this is a strange contraption and how exactly is that useful for testing our lock?
Here is how it is useful. We will suspend our code while the lock is held. We will enter the section and obtain the lock in one Fiber which we will then suspend. When that Fiber yields to us we know that it is holding the lock within itself, and it won’t release the lock it is holding until we #resume
it again. Fibers are “callback-like” and when dealing with this kind of code - for me - it is really really hard to visualise mentally what is going on. What I found helps a ton is just putting the “execution line numbers” next to your code when trying to understand the flow. So:
fiber = Fiber.new do
acquire_lock
Fiber.yield
release_lock
end
fiber.resume
fiber.resume
will be executing roughly in this order:
fiber = Fiber.new do
2) acquire_lock
3) Fiber.yield
5) release_lock
end
1) fiber.resume
4) fiber.resume
When we do the first #resume
we are entering the fiber, and acquiring the lock. Then the Fiber will yield
back to us and suspend itself. Then we #resume
it again and the Fiber will execute the remaining code within itself, to completion. What applies to method bodies also applies to locks, and crucially - our almighty pause button which is the Fiber.yield
call - is going to execute in the block as well. So we can place our block into the Fiber:
fiber = Fiber.new do
DBLock.with("lock_1") do # This is the same as our line 2)
Fiber.yield # Pause at line 3) with the lock still held by us
# Running the block to completion means that the
# `ensure` of `with` will be called and it releases the lock
end
end
fiber.resume # Bam! Now the lock is held within the suspended Fiber
expect {
DBLock.with_lock("lock_1") do
raise "Should never be reached"
end
}.to raise_error(DBLock::Unavailable)
That gets us rid of all of the issues of our previous solution with sleep()
:
- It is deterministic - our second call to try to obtain the lock is guaranteed to happen after the Fiber we created already grabbed the lock and is before it has released it.
- We are testing the existing continual state of the system - the lock is held when we do our assertion. We don’t need to check “saved outputs” of something.
- We don’t need to wait for any sleeps to complete and we can proceed to assert directly
In this instance we need to ask for a 0-second timeout for things to be near-instant, but the gist still holds.
With the particular module I am describing there was a catch of course (isn’t there always): by default this is not going to work 😢
For some reason our test.. still fails. And there is a reason for it. The reason is that these locks in MySQL are reentrant within a connection, meaning you can obtain them multiple times:
SELECT GET_LOCK("lock_1", 1); -- returns 1
SELECT GET_LOCK("lock_1", 1); -- returns 1 too!
SELECT RELEASE_LOCK("lock_1"); -- will return 1
SELECT RELEASE_LOCK("lock_1"); -- will return 1
SELECT RELEASE_LOCK("lock_1"); -- will return NULL
This is useful - it means that within a single SQL session you won’t deadlock with yourself. It also means that you have to RELEASE_LOCK
as many times as you have done GET_LOCK
since this locking system has a checkout counter, but that being said: for our testing this is a negative, because by default ActiveRecord::Base.connection
will give you the connection currently checked out for use by the current active thread. Meaning - both your assertion and your Fiber
will be using the same connection, meaning that they will be able to grab two instances of the lock. You will only be refused on the lock if you try to obtain it from a different MySQL connection, which - in ActiveRecord terms - must be another connection
object. And there is a place to get that from - it is the connection pool:
ActiveRecord::Base.connection_pool.with do
DBLock.with_lock("lock_1") do
# ... do our locky stuff
end
end
Except… that won’t work either, because within our Fiber we still are inside of the same Thread, and the ActiveRecord::Base.connection
is still the same object! So even if we change our test to do this:
fiber1 = Fiber.new do
ActiveRecord::Base.connection_pool.with do
DBLock.with("lock_1") do
Fiber.yield
end
end
end
fiber2 = Fiber.new do
ActiveRecord::Base.connection_pool.with do
DBLock.with("lock_1") do
Fiber.yield
end
end
end
fiber1.resume
expect {
fiber2.resume
}.to raise_error(DBLock::Unavailable)
our test will still fail, since we are checking out a connection from the ActiveRecord pool but not acquiring our locks through it. And thus, begrudgingly, we change our with_lock
to this:
def self.with(lock_name, timeout_seconds: 1, for_connection: ActiveRecord::Base.connection)
qname = for_connection.quote(lock_name) # Note there is a limit of 64 bytes on the lock name
did_acquire = for_connection.select_value("SELECT GET_LOCK(%s, %d)" % [qname, timeout_seconds.to_i])
raise Unavailable, "Unable to acquire lock #{lock_name.inspect} after #{timeout_seconds}, MySQL returned #{did_acquire}" unless did_acquire == 1
yield
ensure
for_connection.select_value("SELECT RELEASE_LOCK(%s)" % qname) if did_acquire == 1
end
We are going to pass the connection
to the method and have it acquire the lock through that particular connection:
fiber1 = Fiber.new do
ActiveRecord::Base.connection_pool.with do |conn|
DBLock.with("lock_1", for_connection: conn) do
Fiber.yield
end
end
end
fiber2 = Fiber.new do
ActiveRecord::Base.connection_pool.with do |conn|
DBLock.with("lock_1", for_connection: conn) do
Fiber.yield
end
end
end
fiber1.resume # Bam! Now the lock is held within the suspended Fiber
expect {
fiber2.resume
}.to raise_error(DBLock::Unavailable)
and indeed, the test now passes!
Same trick can really be applied if you want to “suspend” inside of a different method. Imagine you want to have this locking facility within an object, and you place it in a Module that you prepend
. Say, with a background job that you reeeally want to run only one of at the same time:
module OnlyOnce
def perform(*args)
with_lock("job_abc") do
super
end
end
end
and you want to test that module. What do we do? Well, we can apply the same trick - except that we will need an object which does Fiber.yield
from its perform
. You can yield
this way from anywhere, not only from a hand-written Fiber.new do...
block.
job_mock_class = Class.new do
prepend OnlyOnce
def perform(*)
Fiber.yield
end
end
fiber1 = Fiber.new do
job_mock_class.new.perform # Our module takes the lock and calls super, and super suspends inside of itself
end
fiber1.resume # now we are holding the lock
This applies to any situation where you might want to test code which enters some block and must stay within that block while you make test assertions. Just think of the magic ⏯ and the rest will unwind by itself.