Stream processing for potygen using node-postgres
Perform streaming query with @ovotech/potygen.
Supports:
- async iterators
- node streams
- forEach function
Allows you to use Cursor to split up a big query response and retrieve only a subsection of it at a time, efficiently iterating through it. And helps integrating with other tools that use generators / node-streams.
Each Batch
The simplest of the helpers just calls a callback for each "batch" of items until all the results have been exhaousted.
const productsQuery = toEachBatch(sql`SELECT product FROM orders WHERE region = $region`, { batchSize: 2 });
await productsQuery(db, { region: 'Sofia' }, async (batch) => {
console.log(batch);
});
Async Iterator
Utilizing javascript's async iterators you can iterate through the results by keeping only a single batch in memory, using the humble for of loop.
const productsQuery = toAsyncIterator(sql`SELECT product FROM orders WHERE region = $region`, { batchSize: 2 });
for await (const item of productsQuery(db, { region: 'Sofia' })) {
console.log(item);
}
Async Batch Iterator
The same as the toAsyncIterator
, but keeps the batches intact and retrieves them whole.
const productsQuery = toAsyncBatchIterator(sql`SELECT product FROM orders WHERE region = $region`, { batchSize: 2 });
for await (const batch of productsQuery(db, { region: 'Sofia' })) {
console.log(batch);
}
Stream
You can also utilize node streams to process the data either in batches or one by one
const productsQuery = toReadable(sql`SELECT product FROM orders WHERE region = $region`, { batchSize: 2 });
const sink = new Writable({
objectMode: true,
write: (chunk, encoding, callback) => {
console.log(chunk);
callback();
},
});
const source = productsQuery(db, { region: 'Sofia' });
await asyncPipeline(source, sink);
console.log('Done');
Mapped queries
All of the streaming helpers support mapped queries, and the map will be executed on each batch after its retrieval.
const productsQuery = sql<MyQuery>`SELECT product FROM orders WHERE region = $region`;
const mappedProductsQuery = mapResult(
(rows) => rows.map((row) => ({ ...row, productLength: row.product.length })),
productsQuery,
);
const secondMappedProductsQuery = mapResult(
(rows) => rows.map((row) => ({ ...row, productLengthSquare: Math.pow(row.productLength, 2) })),
mappedProductsQuery,
);
const productsIterator = toAsyncIterator(secondMappedProductsQuery, { batchSize: 2 });
for await (const item of productsIterator(db, { region: 'Sofia' })) {
console.log(item);
}