A small functional library for processing "data flows" in JavaScript (more examples on ObservableHQ). Highlights:
- Lazy Evaluation - Implemented using modern JavaScript generators and iterators
- Memory Efficient - Data "streams" from one "activity" to the next
- Functional - Pure functional implementation.
- Fully Typed - Written in typescript and supports typed chaining of functional activities
- UMD/ES6 Bundles - Works in NodeJS / Browser and includes ES6 modules to ensure you only include what you use (when bundling with RollupJS / Webpack etc.)
The underlying motivation for this library is to simplify the processing of data in an efficient way. The analogy we use is one of a "data" pipe, which consists of:
- Activities: Functional components that modify data as it flows through the pipe.
- Sensors: Functional components that observe the data as it passes through the pipe.
Some other properties of pipes are:
- Can be defined, before being used.
- A complex pipe is just another "Activity" and as such can be re-used inside other pipes.
- Encourages the user to only iterate through the source data ONCE allowing for less memory use and better overall performance.
-
Activity - A functional unit of work that is primary used to alter the data (
map
,filter
,sort
, ...). -
Sensor - A function which "observes" the data without modifying it (
min
,max
,quartile
, ...). -
IterableActivity - An "Activity" which produces an "Iterable" output (
map
,filter
,sort
, ...). -
ScalarActivity - An "Activity" which produces a single value (
min
,max
,reduce
...). - Process or Pipeline - A series of "Activities" chained together, so that "data" "flows" through the process / pipeline.
_Simple example of data flowing through a pipe
of activities: filter
->map
->filter
->first
import { count, filter, first, generate, map, max, pipe, sensor } from "@hpcc-js/dataflow";
const c1 = count();
const c2 = count();
const c3 = count();
const m1 = max(row => row.value);
const p1 = pipe(
sensor(c1), // Keep running count of input
filter(n => n <= 0.5), // Filter out numbers > 0.5
sensor(c2), // Keep running count of filtered rows
map((n, idx) => // Convert to JSON Object
({ index: idx, value: n })),
filter(row => row.index % 2 === 0), // Filter even row indecies
sensor(c3), // Keep running count of final rows
sensor(m1), // Track largest value
first(3) // Take first 3 rows
);
console.log(`Counts: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}`);
// [1] => Counts: undefined, undefined, undefined
const outIterable = p1(generate(Math.random, 1000));
console.log(`Counts: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}`);
// [2] => Counts: undefined, undefined, undefined
console.log(JSON.stringify([...outIterable]));
// [3] => [{"index":0,"value":0.19075931906641008},{"index":2,"value":0.4873469062925415},{"index":4,"value":0.4412516774100035}]
console.log(`Counts: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}, ${m1.peek()}`);
// [4] => Counts: 6, 5, 3, 0.4873469062925415
const outArray = [...p1([0.7, 0.5, 0.4, 0.8, 0.3, 1])];
console.log(JSON.stringify(outArray));
// [5] => [{"index":0,"value":0.5},{"index":2,"value":0.3}]
console.log(`Counts: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}, ${m1.peek()}`);
// [6] => Counts: 6, 3, 2, 0.5
Notes:
- All sensors are undefined as expected
- All sensors are still undefined as
p1(generate(Math.random, 1000))
only returns anIterableIterator
. IOW no data has flown through the pipe yet. -
[...outIterable]
Is a shorthand way to populate an array with data from an iterable. - The sensors now have values we can peek at!
- The pipe
p1
can be reused with new data, this time the input is a simple array - The same sensors will reflect the correct state from the second run
Further the sensors can be observed at any point during the process.
for (const row of p1(generate(Math.random, 1000000))) {
console.log(`${row.index}: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}, ${m1.peek()}`);
}
// => 0: 1, 1, 1, 0.13662528848681
// => 2: 3, 3, 2, 0.13662528848681
// => 4: 7, 5, 3, 0.4328468228869129
Note: Even though there is 1000000 rows of data being potentially generated, only 7 are actually read for this run.
Functions which alter data inside the dataflow pipe
# concat(iterable, iterable): iterable
# concat(iterable): (iterable) => iterable
Concatenates two iterables into a single iterable. Similar to Array.concat.
concat(["a", "b", "c"], ["d", "e", "f"]); // => "a", "b", "c", "d", "e", "f"
const concatDEF = concat(["d", "e", "f"]);
concatDEF(["a", "b", "c"]); // => "a", "b", "c", "d", "e", "f"
concatDEF(["1", "2", "3"]); // => "1", "2", "3", "d", "e", "f"
# each(iterable, callbackFn): iterable
# each(_callbackFn): (iterable) => iterable
Perform callback for each
row in an iterable. Cannot alter the iterable value. Similar to Array.forEach. Useful for debugging steps in a pipe.
each(["a", "b", "c"], (row, idx) => console.log(row)); // => "a", "b", "c"
const logFlow = each(console.log);
logFlow(["a", "b", "c"]); // => "a", "b", "c"
# entries(iterable): iterable
# entries(): (iterable) => iterable
Perform callback for entries
row in an iterable. Cannot alter the iterable value. Similar to Array.entries.
entries(["a", "b", "c"]); // => [0, "a"], [1, "b"], [2, "c"]
const calcEntries = entries();
calcEntries(["a", "b", "c"]); // => [0, "a"], [1, "b"], [2, "c"]
# filter(iterable, condition): iterable
# filter(condition): (iterable) => iterable
Filter iterable based on some condition
. Similar to Array.filter.
const words = ["spray", "limit", "elite", "exuberant", "destruction", "present"];
filter(words, word => word.length > 6); // => "exuberant", "destruction", "present"
const smallWords = filter(word => word.length <= 6);
smallWords(words); // => "spray", "limit", "elite"
# first(iterable, number): iterable
# first(number): (iterable) => iterable
Limit the flow to the first N rows of data.
const words = ["spray", "limit", "elite", "exuberant", "destruction", "present"];
first(words, 3); // => "spray", "limit", "elite"
const first2 = first(2);
first2(words); // => "spray", "limit"
# group(iterable, condition): iterable
# group(condition): (iterable) => iterable
Groups data based on some grouping condition. Output is in the form {key: groupCondition, value:[...]}, where the key has to be either a number
or a string
.
const words = ["one", "two", "three", "four", "five", "six"];
group(words, word => word.length); // => {key: 3, value: ["one", "two", "six"]}, {key: 4, value: ["four", "five"]}, { key: 5, value: ["three"]}
const groupByLength = group(word => word.length);
groupByLength(words); // => {key: 3, value: ["one", "two", "six"]}, {key: 4, value: ["four", "five"]}, { key: 5, value: ["three"]}
# histogram(iterable, condition, options): iterable
# histogram(condition, options): (iterable) => iterable
Groups data into buckets (or bins) based on numeric ranges. Output is in the form {from: numeric, to: numeric, value:[...]}
.
Available options are:
{ buckets: number } // Specify number of buckets / bins
or
{ min: number, range: number } // Specify starting bucket (min) and size of bucket (range)
const data = [1, 12, 13, 13, 3, 14, 19, 6];
histogram(data, n => n, { buckets: 3 }); // => {"from":1,"to":7,"value":[1,3,6]},{"from":7,"to":13,"value":[12]},{"from":13,"to":19,"value":[13,13,14,19]}
histogram(data, n => n, { min: 0, range: 5 }); // => {"from":0,"to":5,"value":[1,3]},{"from":5,"to":10,"value":[6]},{"from":10,"to":15,"value":[12,13,13,14]},{"from":15,"to":20,"value":[19]}
# map(iterable, callback): iterable
# map(callback): (iterable) => iterable
Map data to a new shape via a callback frunction. Similar to Array.map.
map([{ n: 22 }, { n: 11 }, { n: 33 }], (row, idx) => ({ ...row, index: idx })); // => { n: 22, index: 0 }, { n: 11, index: 1 }, { n: 33, index: 2 }
const indexData = map((row, idx) => ({ ...row, index: idx + 1 }));
indexData([{ n: 22 }, { n: 11 }, { n: 33 }]); // => { n: 22, index: 1 }, { n: 11, index: 2 }, { n: 33, index: 3 }
# skip(iterable, number): iterable
# skip(number): (iterable) => iterable
Skip a set number of rows.
const words = ["spray", "limit", "elite", "exuberant", "destruction", "present"];
skip(words, 3); // => "exuberant", "destruction", "present"
const skip4 = skip(4);
skip4(words); // => "destruction", "present"
# sort(iterable, compare): iterable
# sort(compare): (iterable) => iterable
Sort iterable based on result of compare
function (should return -1, 0, 1). Similar to Array.sort.
var numbers = [4, 2, 5, 1, 3];
sort(numbers, (a, b) => a - b); // => 1, 2, 3, 4, 5
const reverseSort = sort((a, b) => b - a);
reverseSort(numbers) // => 5, 4, 3, 2, 1
A collection of "Observers" which can be adapted as functions, activities and sensors
export interface Observer<T, U> {
observe(r: T, idx: number): void;
peek(): U;
}
# sensor(_: Observer): iterable
Adapts an observer so it can be used in a pipe.
# scalar(_: Observer): any
Adapts an observer so it can be called as a regular function.
# count(): Observer
Counts the number of "observed" rows:
const s1 = count();
const s2 = count();
const p1 = pipe(
sensor(s1),
filter(r => r.age > 30),
sensor(s2),
);
const data = [...p1(population)];
s1.peek(); // => 1000;
s2.peek(); // => 699;
const doCount = scalar(count());
doCount([5, 1, 2, -3, 4]); // => 5
# min(): Observer
# min(accessor): Observer
Calculates minimal value for "observed" rows:
const s1 = min();
const s2 = min();
const p1 = pipe(
sensor(s1),
filter(r => r > 3),
sensor(s2),
);
const data = [...p1([1, 2, 3, 4, 5, 0])];
s1.peek() // => 0
s2.peek() // => 4
const calcMin = scalar(min(row => row.id));
calcMin([{ id: 22 }, { id: 44 }, { id: 33 }]); // => 22
# max(): Observer
# max(accessor): Observer
Calculates maximum value for "observed" rows:
const s1 = max();
const s2 = max();
const p1 = pipe(
sensor(s1),
filter(r => r < 3),
sensor(s2),
);
const data = [...p1([1, 2, 3, 4, 5, 0])];
s1.peek() // => 5
s2.peek() // => 2
const calcMax = scalar(max(row => row.id));
calcMax([{ id: 22 }, { id: 44 }, { id: 33 }]); // => 44
# extent(): Observer
# extent(accessor): Observer
Calculates extent (min + max) values for "observed" rows:
const s1 = extent(r => r.age);
const s2 = extent(r => r.age);
const p1 = pipe(
sensor(s1),
filter(r => r.age > 30),
sensor(s2),
);
const data = [...p1(population)];
s1.peek() // => [16, 66]
s2.peek() // => [31, 66]
const calcExtent = scalar(extent(row => row.id));
calcExtent([{ id: 22 }, { id: 44 }, { id: 33 }]); // => [22, 44]
# mean(): Observer
# mean(accessor): Observer
Calculates mean (average) value for "observed" rows:
const calcMean = scalar(mean());
calcMean([5, -6, 1, 2, -2])) // => 0
# median(): Observer
# median(accessor): Observer
Calculates median value for "observed" rows:
const calcMedian = scalar(median());
calcMedian([-6, -2, 1, 2, 5]) // => 1
calcMedian([5, -6, 1, 2, -2]) // => 1
calcMedian([-6, -2, 1, 2, 5, 6]) // => 1.5
calcMedian([5, -6, 1, 2, -2, 6]) // => 1.5
calcMedian([9]) // => 9
# quartile(): Observer
# quartile(accessor): Observer
Calculates quartile value for "observed" rows:
const calcQuartile = scalar(quartile());
calcQuartile([6, 7, 15, 36, 39, 40, 41, 42, 43, 47, 49]) // => [6, 15, 40, 43, 49]
calcQuartile([7, 15, 36, 39, 40, 41]) // => [7, 15, 37.5, 40, 41]
calcQuartile([1, 22, 133]) // => [1, 1, 22, 133, 133]
calcQuartile([2, 144, 33]) // => [2, 2, 33, 144, 144]
# reduce(reducer[, initialValue]): Observer
Calculates reduced value for "observed" rows:
const reduceFunc = (prev, row) => prev + row;
const calcReduce1 = scalar(reduce(reduceFunc));
const calcReduce2 = scalar(reduce(reduceFunc), 10);
calcReduce1([1, 2, 3, 4, 5]) // => 15
calcReduce2([1, 2, 3, 4, 5]) // => 25
# variance(): Observer
# variance(accessor): Observer
Calculates the variance for the "observed" rows. If the number of rows is fewer than two numbers, returns undefined.
const calcVariance = scalar(variance());
calcVariance([5, 1, 2, 3, 4]) // => 2.5
# deviation(): Observer
# deviation(accessor): Observer
Calculates the standard deviation for the "observed" rows. If the number of rows is fewer than two numbers, returns undefined.
const calcDeviation = scalar(deviation());
calcDeviation([5, 1, 2, 3, 4]) // => 1.58113883008 == sqrt(2.5)
# distribution(): Observer<number, { min: number, mean: number, max: number, deviation: number, variance: number}>
# distribution(accessor): Observer<any, { min: number, mean: number, max: number, deviation: number, variance: number}>
Calculates a "distribution" (a combination of min, max, mean, variance and deviance) of the "observed" rows. If the number of rows is fewer than two numbers, returns undefined.
const calcDistribution = scalar(distribution());
calcDistribution([5, 1, 2, 3, 4])) // => { min: 1, mean: 3, max: 5, deviation: Math.sqrt(2.5), variance: 2.5}
Convenience functions
# pipe(iterable, ...iterableActivity): iterable
# pipe(iterable, ...iterableActivity, scalarActivity): scalar
# pipe(...iterableActivity): iterableActivity
# pipe(...iterableActivity, scalarActivity): scalarActivity
Pipes a series of activities into a single process pipeline.
// Iterable output
pipe([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
filter(n => n <= 5),
map((n, idx) => ({ index: idx, value: n })),
filter(row => row.index % 2 === 0),
sort((l, r) => l.value - r.value),
first(3)
); // => { index: 0, value: 0 }, { index: 2, value: 2 }, { index: 4, value: 4 }
const process = pipe(
filter(n => n <= 5),
map((n, idx) => ({ index: idx, value: n })),
filter(row => row.index % 2 === 0),
sort((l, r) => l.value - r.value),
first(3)
);
process([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])// => { index: 0, value: 0 }, { index: 2, value: 2 }, { index: 4, value: 4 }
// Scalar output
pipe([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
process,
max(row => row.value)
); // => 4
const process_2 = pipe(
process,
min(row => row.value)
);
process_2([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); // => 0
# generate(generatorFn[, maxIterations]): iterable
Generates an iterable data set. Optionally limits the length to maxIterations
.
// Iterable output
generate(Math.random); // => Random number iterator
generate(Math.random, 100); // => Random number iterator limited to 100 items