UI Algorithms: A Tiny Promise Queue

I’ve needed this before - a couple of times, just like that other thing. A situation where I am doing uploads using AJAX - or performing some other long-running frontend tasks, and I don’t want to overwhelm the system with all of them running at the same time. These tasks may be, in turn, triggering other tasks… you know the drill. And yet again, the published implementations such as p-queue and promise-queue-plus and the one described in this blog post left me wondering: why do they have to be so big? And do I really have to carry an NPM dependency for something so small?

Turns out - not really. It’s just that a promises queue is a delicate thing conceptually to get “just right”. Once that “just right” is well-known - it’s just a dozen lines. Oh, and don’t use setTimeout. I’ll explain. Let’s define what we want first: we have some long-running tasks. We assume that:

Why limit the number of tasks? Well… imagine you are working on a very high volume file upload service, and you can have your users uploading thousands of files in one go. If you are not careful, you may start all of these uploads at once. A browser will, usually, restrict your requests to 6 in parallel (with HTTP 1.1 at least), but your end-users won’t be happy about it. Those uploads will saturate their entire uplink, hard - so they won’t be doing any Google Meet calls while their upload is running. If your tasks do something processor-intensive - like calculating checksums for the chunks being uploaded - they won’t be throttled by the browser, and you may end up consuming one whole core of the user’s CPU as well.

So: if you are dealing with long-running async tasks based on unbounded user input (N files, M rows etc…) it is generally a very good idea to use such a queue and let it deal with the concurrency aspects of it all.

Additionally, if you are running a small service - having something bombard it with requests may be very destructive. Consider: you are running a Puma webserver with 6 threads. Since you are on a shoestring budget, you don’t really want to get more boxes, and running multiple processes may be annoying for other reasons. You, therefore, have a hard limit on your app handling 6 requests in parallel. Upload requests may be slow because your fronting web server may not be doing much buffering. Once a browser starts doing 6 uploads, your capacity has been saturated.

Moreover - you render just-in-time thumbnails from that same web app. The thumbnails are for the images getting uploaded. The images appear in your uploaded files gallery immediately as the upload completes - and the browser starts trying to retrieve the thumbnails for them. But the web app is already busy processing the next 6 uploads! The thumbnails spin for a while and then display a “broken image” icon instead. Not good.

I find it a good default to limit those situations to 3 uploads at the same time.

Promising, Yet Tricky

A Promise is, in fact, a somewhat limited API. It is not quite like a Task. For example, we could imagine there being a Task presenting us the following API:

const fetchTask = new Task(() => {
  await doThingOne();
  await doThingTwo();
  return await doThingThree();
});
// and then somewhere later - for instance, from a queue
fetchTask.start();

This is not how JS promises work, though: a Promise begins “doing stuff” immediately as it gets instantiated. The “stuff” it is doing can be deferred using timeouts (a Promise can asynchronously call resolve() or reject() at any arbitrary point), but the promise itself is seen as “running”.

So, most APIs for queues of Promises use an “async start” callback function, which then has to be enqueued - not the Promise itself:

queue.push(() => fetch("http://myco.org/api/v0/entries.json"))

Note how we do not enqueue the return value of fetch() - which, by itself, is going to be a Promise - but a function that returns a Promise. Which - conveniently enough - can also be any async function, which our queue can call.

Defining our API

So, let’s sketch out the API that we want:

const maxConcurrent = 4;
const queue = createQueue(maxConcurrent);
for (let f of droppedFiles) {
  let asyncStart = () => {
    return fetch("/put-upload",
      {
        method: "PUT",
        headers: {"content-type": "binary/octet-stream"},
        body: f
      });
  }
  queue.push(asyncStart)
}
queue.running // returns at most 4

If we need other things to happen after our task takes place, we can either let it happen in the same async start function:

let asyncStart = async () => {
  let response = await fetch("/put-upload", {method: "PUT", body: ...});
  let thumb = await response.json();
  someReactiveStore.update("thumbUrl", thumb.url);
}

or place the subsequent async calls into the same queue:

let asyncStart = async () => {
  let response = await fetch("/put-upload", {method: "PUT", body: ...});
  queue.push(async () => {
    let thumb = await response.json()
    someReactiveStore.update("thumbUrl", thumb.url);
  });
}

And here are some defining guardrails I think are important:

For that last one, we have a handy callback on any Promise called finally() - let’s make use of it. finally is also going to be the only event callback we will need to change state in our queue, conveniently.

function createQueue(maxConcurrent) {
  const tasks = [];
  var running = 0;
  const startNext = () => {
    if (running >= maxConcurrent) return;
    const nextTask = tasks.shift();
    if (!nextTask) return;
    running++;
    nextTask();
  }
  return {
    get running() { return running; },
    get pending() { return tasks.length; },
    push(asyncStartFn) {
      const task = (() => {
        const taskPromise = asyncStartFn();
        taskPromise.finally(() => {
          completed++;
          startNext();
        });
      });
      tasks.push(task);
      startNext();
    }
  }
}

And here is how we can use it:

const q = createQueue(3);
q.push(someFuncThatReturnsPromise);

…and see whether anything is running:

if (q.running > 0) {
  console.debug(`Still ${q.running} tasks running, ${q.pending} pending`);
}

Adding a neat touch from p-queue

There is a lovely little thing in p-queue though: pushing into a queue also returns a Promise, which will resolve or reject once the queue has picked up your task and that task has resolved or rejected. From the user’s standpoint it looks like this:

await queue.add(() => got('https://sindresorhus.com'));

Let’s add this too. To accomplish it, we want to return a Promise that will resolve or reject once our queue gets to the task and makes the task’s Promise either resolve or reject:

//...
push(asyncStartFn) {
  return new Promise((outerResolve, outerReject) => {
    const task = (() => {
      const taskPromise = asyncStartFn();
      taskPromise.
        then(resolve, reject).
        finally(() => {
          console.debug(`Finished task, running: ${running}, queue: ${tasks.length}`);
          running--;
          completed++;
          startNext();
      });
    });
    tasks.push(task);
    startNext();
  });
}

That is a bit of a callback soup for sure, but such is the nature of async JS. And then, getting back to our bulk upload use case::

const maxConcurrent = 4;
const queue = createQueue(maxConcurrent);
for (const f of droppedFiles) {
  let asyncStart = () => {
    return fetch("/put-upload",
      {
        method: "PUT",
        headers: {"content-type": "binary/octet-stream"},
        body: f
      });
  }
  queue.push(asyncPut).then(() => {
    displayNotice(`${f.name} upload complete.`);
  });
}

Not that hard, and only 26 lines. My jam.