A simple TypeScript task queue.
This library is built with low overhead in mind:
- 🏎️ Specify the number of parallel tasks
- 🔒 Define capacity
- ⏱️ React to queue position changes
- ⏭️ Stream queue position and values
- 🚫 Discard tasks
You can install this package using your favorite package manager from npm or jsr.
you can pick one of the following commands:
# npm
npm install @sv2dev/queue
bun install @sv2dev/queue
pnpm install @sv2dev/queue
yarn install @sv2dev/queue
deno add npm:@sv2dev/queue
# jsr
npx jsr add @sv2dev/queue
bunx jsr add @sv2dev/queue
pnpm dlx jsr add @sv2dev/queue
yarn dlx jsr add @sv2dev/queue
deno add jsr:@sv2dev/queue
import { Queue } from "@sv2dev/queue";
const queue = new Queue();
In this example, the tasks are executed one after the other.
queue.add(async () => {});
queue.add(async () => {});
You can also add synchronous tasks to the queue.
queue.add(async () => {});
queue.add(() => {});
In this example, always two tasks are executed in parallel. If one task is finished, another one is started.
const queue = new Queue({ parallelize: 2 });
queue.add(async () => {});
queue.add(async () => {});
queue.add(async () => {});
queue.add(async () => {});
You can also use the parallelize
property to change the number of parallel tasks at runtime.
Scaling up the number of parallel tasks will immediately start queued tasks to match the new parallel count.
// schedule some tasks
queue.parallelize = 3;
// more of these tasks will be executed in parallel
Scaling down the number of parallel tasks will not affect already running tasks, but queued tasks will wait until the number of running tasks is reduced to the new parallel count.
const queue = new Queue({ parallelize: 3 });
// schedule some tasks
queue.parallelize = 1;
// less tasks will be executed in parallel when the running tasks are finished
The queue will reject new tasks if it is full. By default, the queue can hold an arbitrary number of tasks.
But the capacity can be limited by setting the max
option.
const queue = new Queue({ max: 2 });
const res1 = queue.add(async () => {});
const res2 = queue.add(async () => {});
const res3 = queue.add(async () => {});
// res1 and res2 are Promises that resolve when the task is finished.
// res3 is null, because the queue is full.
You can also change the capacity at runtime. This will not affect already queued tasks.
queue.max = 1;
In this example, a position listener is passed to the add
method. It will be called every time the queue position changes.
queue.add(
async () => {},
(pos) => {
if (pos === 0) {
console.log(`Task is no longer queued and running`);
} else {
console.log(`This task is at queue position ${pos}`);
}
}
);
Alternatively, you can also use the listener
option to pass a listener function to the add
method.
queue.add(async () => {}, {
listener: (pos) => {
console.log(`This task is at queue position ${pos}`);
},
});
In this example, the task will stream the queue position and the task values.
const iterable = queue.iterate(async () => {});
for await (const [pos, value] of iterable!) {
if (pos === null) {
console.log(`Task emitted a value: ${value}`);
} else if (pos === 0) {
console.log(`Task is no longer queued and running`);
} else {
console.log(`Task is at queue position ${pos}`);
}
}
Tasks that not only return a result but yield values can be streamed as well.
const iterable = queue.iterate(async function* () {
yield "Hello,";
yield "world!";
});
for await (const [pos, value] of iterable!) {
if (pos === null) {
console.log(`The task yielded this value: ${value}`);
}
}
This can also be used with the add
method. In this case, only the last yielded value is returned.
const result = await queue.add(async function* () {
yield "Hello,";
yield "world!";
});
console.log(result); // "world!"
There are several ways to discard tasks.
If you have an iterable, you can break the iteration to discard the task.
const iterable = queue.iterate(async () => {});
for await (const [pos, value] of iterable!) {
break; // Discards the task
}
This only works if the task is not already running.
If you pass an AbortSignal
to the add
or iterate
method, the task will be discarded if the signal is aborted.
const ctrl = new AbortController();
const promise = queue.add(async () => {}, { signal: ctrl.signal });
You can also pass the AbortSignal
directly as the options object.
const ctrl = new AbortController();
const promise = queue.add(async () => {}, ctrl.signal);
For example, if you want to discard a task after a certain time, you can do something like this:
const promise = queue.add(async () => {}, AbortSignal.timeout(10_000));