Nozzle is a utility library for manipulating streams of text, and in particular streamed responses from LLMs.
npm i nozzle-js # or pnpm / bun / yarn
nozzle is written in TypeScript and has both cjs and esm builds.
const stream = await openai.chat.completions.create({ ...args, stream: true })
/*
# Reasoning:
3x3 is equal to 9.
# Answer:
The product is 9.
# Check:
9 / 3 = 3, so I think this answer is correct.
=>
The product is 9.
*/
// extract the section between # Answer and # Reasoning; return the individual sentences at least 100ms apart.
return nz(stream)
.after("# Answer")
.before("# Check")
.split(/ .;,/g)
.trim() // trim the overall response of whitespace.
.minInterval(100)
.value()
// wait, does regex work with ^? probably not, since we truncate all the time, right? // because really, .trim() should just be .replace(^\s+, '').replace(\s+$, ''). // it could also be
import { parse, STR, OBJ } from "nozzle-json";
const input = `
Sure, the object that answers your question is:
\`\`\`json
{"product": 9}
\`\`\`
`
// should have .throwifnotfound or something, as well as .throwiffound, .censor, etc?
return nz(stream)
.after("```json")
.before("```")
.trim()
.accumulate()
.map((prefix) => parse(prefix))
.pairs()
.filter(x => ) // only allow json values which have xyz
.value()
```
Install the library:
git clone https://github.com/Robert-Cunningham/nozzle
cd nozzle
npm i
Then run the tests:
npm run test
This library is licensed under the MIT license.
function aperture<T>(source: Iterable<T>, n: number): AsyncGenerator<T[]>;
Parameter | Type |
---|---|
source |
Iterable <T > |
n |
number |
function accumulate(iterator: AsyncIterable<string>): AsyncGenerator<string>;
Yields a cumulative prefix of the input stream.
const stream = accumulate(streamOf(["This ", "is ", "a ", "test!"]))
for await (const chunk of stream) {
console.log(chunk)
}
// => ["This ", "This is ", "This is a ", "This is a test!"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <string > |
An asynchronous iterable of strings. |
function diff(iterator: AsyncIterable<string>): AsyncGenerator<string>;
Yields the difference between the current and previous string in the input stream.
const stream = diff(streamOf(["This ", "This is ", "This is a ", "This is a test!"]))
for await (const chunk of stream) {
console.log(chunk)
}
// => ["This ", "is ", "a ", "test!"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <string > |
An asynchronous iterable of strings. |
function buffer<T>(source: AsyncIterable<T>, n?: number): AsyncGenerator<T>;
Buffers up to N items from the source iterator, consuming them eagerly and yielding them on demand. If n is undefined, buffers unlimited items.
The buffer() function "slurps up" as much of the input iterator as it can as fast as it can, storing items in an internal buffer. When items are requested from the buffer, they are yielded from this pre-filled buffer. This creates a decoupling between the consumption rate and the production rate.
Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md. This function serves as a reference implementation for proper error handling with background consumers.
// Buffer up to 10 items
const buffered = buffer(source, 10)
// Buffer unlimited items
const unbuffered = buffer(source)
Parameter | Type | Description |
---|---|---|
source |
AsyncIterable <T > |
The async iterable source of values. |
n? |
number |
The maximum number of items to buffer. If undefined, buffers unlimited items. |
function asList<T>(iterator: AsyncIterable<T>): Promise<T[]>;
Consumes an async iterator and returns all values as an array.
const result = await asList(streamOf(["Hello", "World", "!"]))
console.log(result) // => ["Hello", "World", "!"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of strings. |
function asString(iterator: AsyncIterable<string>): Promise<string>;
Consumes an async iterator and returns the final accumulated string. Equivalent to calling accumulate().last() but more efficient.
const result = await asString(streamOf(["Hello", " ", "World"]))
console.log(result) // => "Hello World"
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <string > |
An asynchronous iterable of strings. |
function fromList<T>(list: T[]): AsyncGenerator<T>;
Converts an array to an async iterator.
const stream = fromList(["Hello", "World", "!"])
for await (const chunk of stream) {
console.log(chunk)
}
// => "Hello", "World", "!"
Parameter | Type | Description |
---|---|---|
list |
T [] |
An array of values. |
function asyncMap<T, U>(iterator: AsyncIterable<T>, fn: (value: T) => Promise<U>): AsyncGenerator<U>;
Transforms each value from the input stream using the provided async function. Applies the async function to each item as soon as it comes off the iterator and yields results as they complete, allowing multiple function calls to run concurrently.
Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md to ensure errors are thrown during await ticks for proper try/catch handling.
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of strings. |
fn |
(value : T ) => Promise <U > |
An async function that transforms each string value. |
const stream = asyncMap(streamOf(["hello", "world"]), async x => {
await new Promise(resolve => setTimeout(resolve, 100))
return x.toUpperCase()
})
for await (const chunk of stream) {
console.log(chunk)
}
// => ["HELLO", "WORLD"]
// Fetch data for each URL as they come in
const urls = streamOf(["api/users", "api/posts"])
const responses = asyncMap(urls, async url => {
const response = await fetch(url)
return await response.json()
})
for await (const data of responses) {
console.log(data)
}
function filter<T>(iterator: AsyncIterable<T>, predicate: (chunk: T) => boolean): AsyncGenerator<T>;
Filters the input stream based on a predicate function.
const stream = filter(streamOf(["Hello", "Hi", "World"]), (chunk: string) => chunk.length > 5)
for await (const chunk of stream) {
console.log(chunk)
}
// => ["Hello", "World"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of strings. |
predicate |
(chunk : T ) => boolean
|
A function that returns true for items to keep. |
function map<T, U>(iterator: AsyncIterable<T>, fn: (value: T) => U): AsyncGenerator<U>;
Transforms each value from the input stream using the provided function.
const stream = map(streamOf(["hello", "world"]), x => x.toUpperCase())
for await (const chunk of stream) {
console.log(chunk)
}
// => ["HELLO", "WORLD"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of strings. |
fn |
(value : T ) => U
|
A function that transforms each string value. |
function unwrap<T, R>(iterator: AsyncIterable<{
error?: any;
return?: R;
value?: T;
}>): AsyncGenerator<T, undefined | R, any>;
Unwraps results from wrap() back into a normal iterator that throws/returns/yields. The opposite of wrap() - takes {value, return, error} objects and converts them back to normal iterator behavior.
const wrappedStream = wrap(streamOf(["hello", "world"]))
const unwrapped = unwrap(wrappedStream)
for await (const value of unwrapped) {
console.log("Got:", value) // "hello", "world"
}
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <{ error? : any ; return? : R ; value? : T ; }> |
An asynchronous iterable of wrapped result objects. |
function wrap<T>(iterator: AsyncIterable<T>): AsyncGenerator<{
error?: unknown;
return?: any;
value?: T;
}>;
Wraps an iterator to catch any errors and return them in a result object format.
Instead of throwing, errors are yielded as {error}
and successful values as {value}
.
const stream = wrap(streamOf(["hello", "world"]))
for await (const result of stream) {
if (result.value !== undefined) {
console.log("Got:", result.value)
} else if (result.return !== undefined) {
console.log("Return:", result.return)
} else {
console.log("Error:", result.error)
}
}
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable. |
function compact(iterator: AsyncIterable<string>): AsyncGenerator<string>;
Filters out empty strings from the input stream.
const stream = compact(streamOf(["Hello", "", "World", ""]))
for await (const chunk of stream) {
console.log(chunk)
}
// => ["Hello", "World"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <string > |
An asynchronous iterable of strings. |
function first<T>(iterator: AsyncIterable<T>): AsyncGenerator<T>;
Yields only the first value from the input stream.
const stream = first(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
console.log(chunk)
}
// => ["Hello"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of strings. |
function head<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;
Yields only the first value from the input stream.
const stream = head(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
console.log(chunk)
}
// => ["Hello"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of values. |
function initial<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;
Yields all values except the last from the input stream.
const stream = initial(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
console.log(chunk)
}
// => ["Hello", "World"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of values. |
function last<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;
Yields only the last value from the input stream.
const stream = last(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
console.log(chunk)
}
// => ["!"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of values. |
function slice<T>(
iterator: AsyncIterable<T>,
start: number,
end?: number): AsyncGenerator<T>;
Yields a slice of the input stream between start and end indices. Supports negative indices by maintaining an internal buffer.
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
The async iterable to slice |
start |
number |
Starting index (inclusive). Negative values count from end. |
end? |
number |
Ending index (exclusive). Negative values count from end. If undefined, slices to end. |
const stream = slice(streamOf(["a", "b", "c", "d", "e"]), 1, 3)
for await (const chunk of stream) {
console.log(chunk)
}
// => ["b", "c"]
const stream = slice(streamOf(["a", "b", "c", "d", "e"]), -2)
for await (const chunk of stream) {
console.log(chunk)
}
// => ["d", "e"]
function tail<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;
Yields all values except the first from the input stream.
const stream = tail(streamOf(["Hello", "World", "!"]))
for await (const chunk of stream) {
console.log(chunk)
}
// => ["World", "!"]
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T > |
An asynchronous iterable of values. |
function replace(
input: AsyncIterable<string>,
regex: RegExp,
replacement: string): AsyncGenerator<string>;
Replaces matches of a regex pattern with a replacement string in the input stream.
Uses earliestPossibleMatchIndex to efficiently yield tokens as soon as we know they don't match the regex, while holding back potential matches until we can determine if they should be replaced.
const stream = replace(streamOf(["a", "b", "b", "a"]), /a[ab]*a/g, "X")
for await (const chunk of stream) {
console.log(chunk)
}
// => ["X"]
Parameter | Type | Description |
---|---|---|
input |
AsyncIterable <string > |
- |
regex |
RegExp |
The regular expression pattern to match. |
replacement |
string |
The string to replace matches with. |
function tap<T, R>(iterator: AsyncIterable<T, R>, fn: (value: T) => void): AsyncGenerator<T, R, undefined>;
Executes a side effect for each value without modifying the stream.
const stream = tap(streamOf(["Hello", "World", "!"]), console.log)
for await (const chunk of stream) {
// console.log will have printed each chunk
console.log("Processed:", chunk)
}
// => logs: "Hello", "World", "!", then "Processed: Hello", "Processed: World", "Processed: !"
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterable <T , R > |
An asynchronous iterable of strings. |
fn |
(value : T ) => void
|
A function to execute for each value. |
function tee<T>(iterator: AsyncIterator<T>, n: number): AsyncGenerator<T, any, any>[];
Splits a single iterator into N independent iterables.
Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md to ensure errors are thrown during await ticks for proper try/catch handling.
Parameter | Type | Description |
---|---|---|
iterator |
AsyncIterator <T > |
The source async iterator to split. |
n |
number |
Number of independent iterables to create. |
function after(source: StringIterable, pattern: string | RegExp): AsyncGenerator<string>;
Emit everything after the accumulated prefix that matches pattern
.
const stream = after(streamOf(["a", "b", "c", "d", "e"]), /bc/)
for await (const chunk of stream) {
console.log(chunk)
}
// => ["d", "e"]
Parameter | Type | Description |
---|---|---|
source |
StringIterable |
stream or iterable to scan |
pattern |
string | RegExp
|
first RegExp that marks the cut-off |
function before(source: StringIterable, separator: string | RegExp): AsyncGenerator<string>;
Emit everything before the accumulated prefix that contains separator
.
const stream = before(streamOf(["a", "b", "c", "d", "e"]), "cd")
for await (const chunk of stream) {
console.log(chunk)
}
// => ["a", "b"]
Parameter | Type | Description |
---|---|---|
source |
StringIterable |
stream or iterable to scan |
separator |
string | RegExp
|
string that marks the cut-off |
function chunk(source: AsyncIterable<string>, size: number): AsyncGenerator<string>;
Groups input tokens into chunks of the specified size and yields the joined result. Takes N input items and yields N/size output items, where each output is the concatenation of size input items.
Parameter | Type | Description |
---|---|---|
source |
AsyncIterable <string > |
The async iterable source of strings (tokens). |
size |
number |
The number of input tokens to group together in each output chunk. |
function split(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;
Takes incoming chunks, merges them, and then splits them by a string separator.
Parameter | Type | Description |
---|---|---|
source |
AsyncIterable <string > |
The async iterable source of strings. |
separator |
string | RegExp
|
The string separator to split by. |
function splitAfter(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;
Takes incoming chunks, merges them, and then splits them by a string separator, keeping the separator at the end of each part (except the last).
Parameter | Type | Description |
---|---|---|
source |
AsyncIterable <string > |
The async iterable source of strings. |
separator |
string | RegExp
|
The string separator to split by. |
function splitBefore(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;
Takes incoming chunks, merges them, and then splits them by a string separator, keeping the separator at the beginning of each part (except the first).
Parameter | Type | Description |
---|---|---|
source |
AsyncIterable <string > |
The async iterable source of strings. |
separator |
string | RegExp
|
The string separator to split by. |
function minInterval<T>(source: AsyncIterable<T>, delayMs: number): AsyncGenerator<T>;
Enforces a minimum delay between adjacent tokens in a stream.
The first token is yielded immediately, then subsequent tokens are delayed
to ensure at least delayMs
milliseconds pass between each yield.
Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md to ensure errors are thrown during await ticks for proper try/catch handling.
Parameter | Type | Description |
---|---|---|
source |
AsyncIterable <T > |
The async iterable source of tokens. |
delayMs |
number |
The minimum delay in milliseconds between adjacent tokens. |
function throttle<T>(
source: AsyncIterable<T>,
intervalMs: number,
merge: (values: T[]) => T): AsyncGenerator<T>;
Throttles the output from a source, with special timing behavior:
- The first chunk is yielded immediately
- Subsequent chunks are batched and yielded together after the interval
- If no chunks arrive during an interval, the next chunk is yielded immediately when it arrives
Error handling follows the pattern described in file://./../../ASYNC_ERROR_HANDLING.md to ensure errors are thrown during await ticks for proper try/catch handling.
Parameter | Type | Description |
---|---|---|
source |
AsyncIterable <T > |
The async iterable source of values. |
intervalMs |
number |
The throttling interval in milliseconds. |
merge |
(values : T []) => T
|
- |
function flatten<T>(src: Iterable<Iterable<T> | T[]>): AsyncGenerator<T>;
Flattens nested arrays or iterables into a single stream.
const stream = fromList([["a", "b"], ["c", "d"], ["e"]])
const flattened = flatten(stream)
for await (const chunk of flattened) {
console.log(chunk)
}
// => "a", "b", "c", "d", "e"
Parameter | Type | Description |
---|---|---|
src |
Iterable <Iterable <T > | T []> |
The source iterable containing nested arrays or iterables. |