Some Thoughts on Streaming Responses

Simon Willison has recently touched on the topic of streaming responses - for him it is in Python, and he was collecting experiences people have had for serving them.

I’ve done a lion’s share of streaming from Ruby so far, and I believe at the moment WeTransfer is running one of the largest Ruby streaming applications - our download servers. Since a lot of questions rose up I think I will cover them in order, and maybe I could share my perspective on how we deal with certain issues which arise in this configuration.

Piggybacking on the interface of the server

My former colleague Wander has covered this in more detail for Ruby, but basically - your webserver integration API of choice - be it FastCGI, WSGI, Rack or Go’s responseWriter - will present you with a certain API for writing output into the socket of the connected user. These APIs can be “pull” or “push”. A “push” API allows you to write into the socket, and a “pull” API takes data from you and does something with it. The Rack API (which is the basic stepping stone for most Ruby streaming services) is of the “pull” variety - you give Rack an object which responds to each, and that method has to yield the strings to be written into the socket, succesively.

body = Enumerator.new do |yielder|
  yielder.yield "First chunk"
  sleep 5
  yielder.yield "Second chunk"
end
[200, {}, body]

If you want more performance, you are likely to switch to something called a Rack hijack API which allows you to use the “push” model:

env['rack.hijack'] = ->(io) {
  io.write "First chunk"
  sleep 5
  io.write "Second chunk"
}

Other languages and runtimes will do it in a similar way. So that part is fairly straightforward. You do need to pay attention that between writing those chunks you can somehow yield control to other threads/tasks in your application (so it is really a good idea to dust off asyncio), or just use a runtime which does this for you – like Go or Crystal.

Restarting servers (deployment of new versions)

This is indeed a tricky aspect of these long-running responses. You can’t “just” quickly wait for all connections to finish and restart your server, or decomission your pod/machine/instance and replace it with another. Instead, you are going to be using a process called “draining” - you remove your machine from the load balancer so that it won’t be receiving new connections. Then you use some form of monitoring - like Puma stats - to figure out when your connection count has dropped to zero. Once it did - you can shutdown the server. In practice this means that when you want to deploy a new version of the software you will either end up with a temporary drop of capacity of 1 unit (when you wait for an instance to drain and then start a new one instead), or with a temporary overcapacity of 1 unit (if you start a new instance immediately).

This is somewhat tangential, but a lot of the runtimes we currently use are perfectly capable of “live reloading” - give or take a number of complexities that come along with it. “A Pipeline Made Of Airbags” covers it very well - basically, for a number of tasks we would be better off live-reloading our applications “in situ”, exactly because one of the benefits would be to preserve the current tasks which are servicing connections. We have given this away for the sake of immutable infrastructure, which does give advantages - but if you intend to do those streaming responses you might want to consider live reloading / hot swapping, especially if your server fleet is small.

If you want to do it, Puma - for one - allows you to do a “hot restart” which will drain the running worker processes but spawn new ones.

At WeTransfer, the way we do it is fleetwide draining - we start a draining script on the machines, wait for the connections to drain (bleed off) and then update the code on the machines, starting the new version of the code right after.

There are ways to pass a file descriptor (socket) from one UNIX process to another, effectively transferring a user connection from your old application code to the new one - but these are very esoteric and there is plenty that can go wrong in flight, haven’t heard of these being used much.

How to return errors

Basically - you can’t because the HTTP protocol doesn’t provide for it. Once you have shipped your headers you do not have the option of telling the client what exactly has gone wrong, short of closing the socket forcibly. Luckily most browsers will at least know that you have terminated your output prematurely and will alert the user accordingly. If you are using a “sized” response (which has a Content-Length header) then every HTTP client will raise an error to the user that the response hasn’t been received in full. If you are using the chunked encoding instead - the client will also know that something has gone wrong and won’t consider the response valid, but you can’t tell the client what exactly happened. You could potentially use HTTP/2 trailers for the error message but I doubt any HTTP clients support it.

What you can do is use some kind of error tracking on your server side of things (something like Appsignal or Sentry) and at least register the error there.

Resumable downloads

Tricky but doable given (strict and often unstatisfiable!) preconditions. The problem is essentially one of random access to a byte blob. And actually consists of several problems! Let’s cover them in order.

Idempotent input

You want your returned resource to be the same on all requests, or - alternatively - you need the response not to change between resumptions. It heavily depends on what you are doing, but most - say - SQL queries are not going to be idempotent if your database accepts writes between requests to your report. And you do not have to produce a “baked” response upfront and then serve it piece by piece - but you must be able to reliably reproduce this response based on a frozen bit of input, or - at least - reproduce it’s chunk which the Range header is asking for.

If you can do that - produce a “frozen” bit of state from which you can (via some transformation) obtain a byte-addressable resource - you need to make sure the libraries you use to transform that response will produce identical output. Imagine you have a pipeline like this which produces certain output:

[ SQL resultset ] --> [ CSV writer ] --> [ Client ]

Here, your SQL query and its parameters must be “fixed” and idempotent - so no functions like NOW() should be used. The dataset it queries must, again, be “frozen” and should not change between requests to the resource. Interestingly, Datasette which Simon built is a great example of such a frozen dataset server, so this applies very well here!

If you are using pagination, the requirement stops here - you just need to make sure your paginated URLs return the same results, always. But if you want to provide Range access (random access within a byte blob) - there is another twist.

Random access

The query output goes into a CSV output generator. Now, if all the columns are of fixed and known width (byte size) - which can be achieved by padding them, for example - you can use some offset calculations to know how large one row of your returned dataset is going to be. Based on that, when there is a request incoming, you can “skip” a certain number of rows and generate only the output which is required. However, this means that the version of your CSV library - as well as the version of the algorithm you use to generate your CSV rows (if you are using custom padding for example) – must stay the same. This is why Range requests always use an ETag.

To provide random access you first need to know how large your entire HTTP resource would be. If you are doing output of variable-size chunks - for example, rows in a CSV without padding it with spaces - you are going to have some trouble, because you do not have a way to precompute the size of your output short of pregenerating your entire response first. Which, if you do it that way, is easier just to dump onto some cloud storage and serve it via redirect from there! But: if you are feeling advantegous and your dataset is of known size and has known, sizeable chunks - you can “pre-size” your response. For example, imagine you are serving 12 million samples of a 3D point in space, and it consists of 3 floating point numbers which you know are all within +- 10. You can use a fixed decimal output and you use up to 5 digits for precision. You can then size your response:

def size_response(num_samples)
  header_size = "x,y,z\n".bytesize
  single_sample_size = " 0.00000, 0.00000, 0.00000\n".bytesize
  header_size + (single_sample_size * num_samples)
end

Note how we strategically pad the zeroes with a space - this is what we are going to use for the minus sign if we encounter negative values. Based on a computation like this you can output an accurate Content-Length header - and this gets you closer to supporting Range headers too.

To have those, we allow the user to supply a Range header to us and only return the bytes of the response covered by that header. We can totally do that as long as we can locate the bit of input which will produces the desired piece of output, and pad for the formatting/transformation. Imagine the user requests the following chunk of our hypothetical “3D point” samples (this would be Range: bytes=8-50):

x,y,z
0.0[0000, 0.00000, 0.00000
0.78211,-0.29090, ]0.29311

Based on the function we had earlier we can try to locate which rows we need to output:

offset_of_first_row = "x,y,z\n".bytesize
row_size = " 0.00000, 0.00000, 0.00000\n".bytesize
first_covered_row = (byte_range_start - offset_of_first_row) / row_size # Assuming int division rounding down
last_covered_row = (byte_range_end - offset_of_first_row) / row_size # Assuming int division rounding down

If our input dataset supports cursors we can then use some black magic to materialize only those rows into the response. We might also need to chop off the first 3 bytes of the first output row, and 7 bytes off the end.

All of the above seems pretty convoluted, and it is - but in practice it can be done if truly necessary. Another approach which can be used for both pre-sizing and random access is using some kind of an “segment manifest” - a list of “segments” which compose your response. Both Google’s download servers and WeTransfer download servers do compose a kind of a sequence. We use an internal format called a “manifest”, which looks roughly like this:

{
  "segments": [
     {"upstream_http_segment_url": "http://some-upstream/blob.bin", "bytesize": 90219},
     {"binary_segment_base64": "OnNtYWxsIGhlYWRlcjo=", "bytesize": 14}
  ]
}

This enables us to compute the size of the resource (by summing up bytesize values of all the objects in the manifest), but also to provide random access by finding objects which are covered by the requested Range. For example, this is the way we do it for ZIP files - while the “metadata” parts of the ZIPs are pregenerated as byte blobs - and stored in those Base64 segments - the included data blobs are streamed through from cloud storage. We can’t reliably generate pieces of ZIP headers but it’s not really necessary - these chunks are “pre-rendered” and available within that segment map, and our library trims off the excess bytes off those chunks if needed.

I know that flussonic and other video streaming servers use a similar approach. Go includes some builtin libraries for that (sizedReaderAt) and the like. The approach with pre-rendering this segment map also means that the output in it will have been generated by a single code deployment of our ZIP library and will be consistent.

When you have a segment map like this, you can use either the serveContent from Go standard library or something like interval_response in Ruby to address these segments using HTTP Range requests - the latter will give you the ranges within your segments too. I’ve covered the technique in this article in more detail.

Some painkillers

If you have random access to files (using the aforementioned interval_response say) but those files would be huge, you can instead generate multiple smaller files and use a segment manifest to allow your application to just grab a few of them. For example, you can provide random access to a very large (arbitrarily large!) collated log file, which actually consistes of many smaller log files, served in sequence. Then you can serve only the pieces of files used by the request:

for_segment_requested do |segment, range_in_segment|
  segment.seek(range_in_segment.begin, IO::SEEK_SET)
  bytes_to_serve_for_segment = range_in_segment.end - range_in_segment.begin
  IO.copy_stream(segment, client_socket, n_bytes_to_serve)
end

In summary

Yes, true random access to dynamic HTTP resources is difficult to achieve. Streaming is easier, but you will need to be careful with memory bloat if you are using a garbage-collected language. But the few times you do need’em - you can make’em happen, as long as you “do the right thing” throughout the pipeline. Most likely you can make your life much easier if you skip certain constraints of this process - for example, remove the requirement of resumable downloads or the requirement that responses be pre-sized.