dataflow-api

0.1.0 • Public • Published

dataflow-api

JavaScript API for dataflow processing using the vega-dataflow reactive engine. Perform common database operations (sorting, filtering, aggregation, window calculations) over JavaScript objects. Build and compose transformation pipelines over streaming data.

Installing

If you use NPM, npm install dataflow-api. Otherwise, download the latest release. You can also load directly from GitHub as a standalone library. AMD, CommonJS, and vanilla environments are supported. In vanilla, a df global is exported:

<script src="https://vega.github.io/dataflow-api/dataflow-api.v0.min.js"></script>
<script>
var flow = df.dataflow([
  df.aggregate()
    .groupby(['category'])
    .measure([df.count(), df.sum('amount').as('sum')])
]);
 
flow.insert([
  {category: 'a', amount: 12},
  {category: 'a', amount: 5},
  {category: 'b', amount: 11}
]);
 
// [{category: 'a', count: 2, sum: 17}, {category: 'b', count: 1, sum: 11}]
console.log(flow.values());
</script> 

API Reference

Dataflows

A dataflow is a processing pipeline that consists of a sequence of data transforms. A dataflow can either be a standalone dataflow that allows data objects to be added or removed, or a derived dataflow that processes the output of an upstream flow. All dataflows are reactive: they automatically re-evaluate upon changes to input data or upstream flows.

# df.dataflow([source,] transforms) <>

Creates and returns a new dataflow. The required transforms parameter is an array of transform descriptions. To create a dataflow that accepts input data, the transforms array should be provided as the sole argument. To instead create a derived flow, the first argument should be a source dataflow which the new dataflow will consume.

# dataflow.values() <>

Returns the output array of data objects for the dataflow. To avoid a potentially expensive data copy, the values array is the same instance used internally by the dataflow. Making modifications to the array or any contained data objects may corrupt the state of the dataflow, affecting future updates.

# dataflow.insert(data) <>

Inserts one or more input data objects into the dataflow. The input data to insert can either be a single data object or an array of objects. Upon insertion, the dataflow is automatically re-evaluated, potentially changing the output values. Note that derived dataflows do not support an insert method.

# dataflow.remove(data) <>

Removes one or more input data objects from the dataflow. The input data to remove can either be a single data object or an array of objects. The data to remove must have already been passed as input to the dataflow via the insert method; if not, the resulting behavior is undefined. Upon removal, the dataflow is automatically re-evaluated, potentially changing the output values. Note that derived dataflows do not support a remove method.

# dataflow.on(callback) <>

Adds a listener callback function that is invoked when the dataflow output values update. The callback is invoked within a setTimeout call after dataflow execution completes. To subsequently remove the listener, use the off method.

The callback function is invoked with a single argument containing the array of output data values. To avoid a potentially expensive data copy, the values array is the same instance used internally by the dataflow. Making modifications to the array or any contained data objects may corrupt the state of the dataflow, affecting future updates.

dataflow.on(function(values) {
  // this method is invoked when the output values update
  // the values array is from the internal dataflow state and is *not* copied
  // make a defensive copy if you wish to modify the array
  console.log(values);
});

# dataflow.off(callback) <>

Removes a listener callback function that was added using the on method.

Transforms

Transform operators that process data within a dataflow:

  • aggregate - Group and summarize data objects.
  • bin - Discretize numeric values into uniform bins.
  • countpattern - Count the frequency of patterns in text strings.
  • filter - Filter a data stream using a predicate expression.
  • fold - Collapse selected data fields into key and value properties.
  • formula - Extend data objects with derived fields using a formula expression.
  • joinaggregate - Extend data objects with calculated aggregate values.
  • project - Generate derived data objects with a selected set of fields.
  • sample - Randomly sample a subset of data objects.
  • sort - Sort data objects by the specified fields.
  • window - Calculate over ordered groups, including ranking and running totals.

# df.aggregate([groupby, measure]) <>

Creates a new aggregate transform specification. The aggregate transform groups and summarizes an input data stream to produce a new output stream. Aggregate transforms can be used to compute counts, sums, averages and other descriptive statistics over groups of data objects. The optional arguments groupby and measure are shorthands for the corresponding parameter methods.

Example Use

// Generate new data objects for each per-category count and amount sum
df.aggregate()
  .groupby(['category'])
  .measure([df.count().as('cnt'), df.sum('amount')])
 
// Identical specification using shorthand arguments
df.aggregate(['category'], [df.count().as('cnt'), df.sum('amount')])
 
// Identical specification using measure object notation
df.aggregate(['category'], [
  {op: 'count', as: 'cnt'},
  {op: 'sum', field: 'amount'}
])

Aggregate Parameters

Name Type Description
groupby Array < Field > The data fields to group by. If not specified, a single group containing all data objects will be used.
measure Array < Measure > The aggregate measures to compute. If not specified, a single count aggregate is performed. The measures can use any supported [aggregate operation
cross Boolean Indicates if the full cross-product of all groupby values should be included in the aggregate output (default false). If true, all possible combinations of groupby field values will be considered and zero count groups will be generated and returned for combinations that do not occur in the data itself. Cross-product output act as if the drop parameter is false. In the case of streaming updates, the number of output groups will increase if new groupby field values are observed; all prior groups will be retained. This parameter can be useful for generating facets that include groups for all possible partitions of the data.
drop Boolean Indicates if empty (zero count) groups should be dropped (default true). When a data stream changes, aggregation groups may become empty. By default, the group is removed from the output. However, in some cases (such as histograms), one may wish to retain empty groups.

# df.bin([field]) <>

Creates a new bin transform specification. The bin transform discretizes numeric values into a set of bins. A common use case is to create a histogram. The optional argument field is a shorthand for the corresponding parameter method.

Example Use

// Bin the 'amount' field, up to a maximum of 30 bins
// Write the bin boundaries to the fields 'bin_start' and 'bin_end'
df.bin().field('amount').maxbins(30).as(['bin_start', 'bin_end'])
 
// Identical specification using shorthand arguments
df.bin('amount').maxbins(30).as(['bin_start', 'bin_end'])

Bin Parameters

Name Type Description
field Field Required. The data field to bin.
extent Array < Number > A two-element array with the minimum and maximum values of the bin range. If unspecified, the extent is set to [min, max] of the observed data values.
anchor Number A value in the binned domain at which to anchor the bins, shifting the bin boundaries if necessary to ensure that a boundary aligns with the anchor value. By default, the minimum bin extent value serves as the anchor.
maxbins Number The maximum number of bins to create (default 20).
base Number The number base to use for automatic bin determination (default 10).
step Number An exact step size to use between bins. If provided, options such as maxbins will be ignored.
steps Array < Number > An array of allowable step sizes to choose from.
minstep Number The minimum allowed bin step size (default 0).
divide Array < Number > Allowable bin step sub-divisions. The default value is [5, 2], which indicates that for base 10 numbers (the default base) automatic bin determination can consider dividing bin step sizes by 5 and/or 2.
nice Boolean If true (the default), attempts to make the bin boundaries use human-friendly boundaries, such as multiples of ten.
as Array < String > The output field names at which to write the start and end bin values. The default is ["bin0", "bin1"].

# df.countpattern([field, pattern, case]) <>

Creates a new countpattern transform specification. The countpattern transform counts the number of occurrences of a text pattern, as defined by a regular expression. This transform will iterate through each data object and count all unique pattern matches found within the designated text field. The optional arguments field, pattern and case are shorthands for the corresponding parameter methods.

Example Use

// Count all alphabetic substrings in the 'description' field
// This example maps all input text to lowercase.
df.countpattern().field('description').pattern(/[a-z]+/).case('lower')
 
// Identical specification using shorthand arguments
df.countpattern('description', /[a-z]+/, 'lower')

CountPattern Parameters

Name Type Description
field Field Required. The data field containing the text data.
pattern RegExp A regular expression indicating the pattern to count. All unique pattern matches will be separately counted. The default value is [\\w\']+, which will match sequences containing word characters and apostrophes, but no other characters.
case String A lower- or upper-case transformation to apply prior to pattern matching. One of "lower", "upper" or "mixed" (the default).
stopwords String A regular expression defining a pattern of text to ignore. For example, the value "(foo|bar|baz)" will treat the words "foo", "bar" and "baz" as stopwords that should be ignored. The default value is the empty string (""), indicating no stop words.
as Array < String > The output field names for the text pattern and occurrence count. The default is ["text", "count"].

# df.filter([expr]) <>

Creates a new filter transform specification. The filter transform removes objects from a data stream based on a provided filter expression. The optional argument expr is a shorthand for the corresponding parameter method.

Example Use

let predicate = df.expr(d => d.amount > 100).fields(['amount'])
 
// Remove data objects with 'amount' values <= 100
df.filter().expr(predicate)
 
// Identical specification using shorthand arguments
df.filter(predicate)

Filter Parameters

Name Type Description
expr Expression Required. A predicate function for filtering the data. If the expression evaluates to false, the data object will be filtered.

# df.fold([fields]) <>

Creates a new fold transform specification. The fold transform collapses (or “folds”) one or more data fields into two properties: a key property (containing the original data field name) and a value property (containing the data value). The fold transform is useful for mapping matrix or cross-tabulation data into a standardized format. This transform generates a new data stream in which each data object consists of the key and value properties as well as all the original fields of the corresponding input data object. The optional argument fields is a shorthand for the corresponding parameter method.

Example Use

// Collapse the 'fieldA' and 'fieldB' fields into key-value pairs
// The output stream will contain twice as many data objects
df.fold().fields(['fieldA', 'fieldB'])
 
// Identical specification using shorthand arguments
df.fold(['fieldA', 'fieldB'])

Fold Parameters

Name Type Description
fields Array < Field > Required. An array of data fields indicating the properties to fold.
as Array < String > The output field names for the key and value properties produced by the fold transform. The default is ["key", "value"].

# df.formula([as, expr]) <>

Creates a new formula transform specification. The formula transform extends data objects with new values according to a calculation formula. The optional arguments as and expr are shorthands for the corresponding parameter methods.

Example Use

let mag = df.expr(d => Math.sqrt(d.u * d.u + d.v * d.v)).fields(['u', 'v'])
 
// Extend each object with a 'magnitude' field defined by the given function
df.formula().as('magnitude').expr(mag)
 
// Identical specification using shorthand arguments
df.formula('magnitude', mag)

Formula Parameters

Name Type Description
expr Expression Required. The formula function for calculating derived values.
as String Required. The output field at which to write the formula value.
initonly Boolean If true, the formula is evaluated only when a data object is first observed. The formula values will not automatically update if data objects are modified. The default is false.

# df.joinaggregate([groupby, measure]) <>

Creates a new joinaggregate transform specification. The joinaggregate transform extends the input data objects with aggregate values. Aggregation is performed and the results are then joined with the input data. This transform can be helpful for creating derived values that combine both raw data and aggregate calculations, such as percentages of group totals. The optional arguments groupby and measure are shorthands for the corresponding parameter methods.

The parameters for this transform are identical to the aggregate transform, but rather than creating new output objects, the results are written back to each of the input data objects. An equivalent result can be achieved using a window transform where the sliding window frame encompasses the entire group; however, the joinaggregate provides a more performant alternative for this special case.

Example Use

// Extend each data object with per-category counts and sum(amount)
df.joinaggregate()
  .groupby(['category'])
  .measure([df.count().as('cnt'), df.sum('amount')])
 
// Identical specification using shorthand arguments
df.joinaggregate(['category'], [df.count().as('cnt'), df.sum('amount')])
 
// Identical specification using measure object notation
df.joinaggregate(['category'], [
  {op: 'count', as: 'cnt'},
  {op: 'sum', field: 'amount'}
])

JoinAggregate Parameters

Name Type Description
groupby Array < Field > The data fields to group by. If not specified, a single group containing all data objects will be used.
measure Array < Measure > The aggregate measures to compute. If not specified, a single count aggregate is performed. The measures can use any supported [aggregate operation

# df.project([fields]) <>

Creates a new project transform specification. The project transform performs a relational algebra projection operation. Thie transform produces a stream of new data objects that include one or more fields of the source stream, with the data fields optionally renamed. The optional argument fields is a shorthand for the corresponding parameter method.

Example Use

// Project the 'amount' field to new objects with a single field named 'value'
df.project().fields([df.field('amount').as('value')])
 
// Identical specification using shorthand arguments
df.project([df.field('amount').as('value')])
 
// Identical specification using field object notation
df.project([{field: 'amount', as: 'value'}])

Project Parameters

Name Type Description
fields Array < Field > The data fields that should be copied over in the projection. If unspecified, all fields will be copied using their existing names.

# df.sample([size]) <>

Creates a new sample transform specification. The sample transform randomly samples a data stream to create a smaller stream. As input data objects are added and removed, the sampled values may change in first-in, first-out manner. This transform uses reservoir sampling to maintain a representative sample of the stream. The optional argument size is a shorthand for the corresponding parameter method.

Example Use

// Collect a random sample of 500 data objects
df.sample().size(500)
 
// Identical specification using shorthand arguments
df.sample(500)

Sample Parameters

Name Type Description
size Number The maximum number of data objects to include in the sample. The default value is 1000.

# df.sort([compare]) <>

Creates a new sort transform specification. This transform materializes all the objects in a data stream within a single array, allowing sorting by data field values. The optional argument compare is a shorthand for the corresponding parameter method.

Example Use

// Sort in descending order by the 'amount' field
df.sort().compare('-amount')
 
// Identical specification using shorthand arguments
df.sort('-amount')
 
// Identical specification using comparator object notation
df.sort({fields: ['amount'], orders: ['descending']})
 
// Identical specification using an explicit comparator expression
df.sort(df.expr((a, b) => b.amount - a.amount).fields(['amount']))

Sort Parameters

Name Type Description
compare Compare A comparator for sorting data objects.

# df.window([compare, frame, measure]) <>

Creates a new window transform specification. The window transform performs calculations over sorted groups of data objects. These calculations including ranking, lead/lag analysis, and aggregates such as running sums and averages. Calculated values are written back to the input data stream. The optional arguments compare, frame and measure are shorthands for the corresponding parameter methods.

Example Use

df.window()
  .compare('amount')
  .frame([null, null])
  .measure([df.rank(), df.sum('amount')])
  .groupby(['category'])
 
// Identical specification using shorthand arguments
df.window('amount', [null, null], [df.rank(), df.sum('amount')])
  .groupby(['category'])
 
// Identical specification using measure object notation
df.window('amount', [null, null], [
  {op: 'rank'},
  {op: 'sum', field: 'amount'}
]).groupby(['category'])

Window Parameters

Name Type Description
compare Compare A comparator for sorting data objects within a window. If two data objects are considered equal by the comparator, they are considered "peer" values of equal rank. If compare is not specified, the order is undefined: data objects are processed in the order they are observed and none are considered peers (the ignorePeers parameter is ignored and treated as if set to true).
groupby Array < Field > The data fields by which to partition data objects into separate windows. If not specified, a single group containing all data objects will be used.
measure Array < Measure > The window measures to compute. The measures can use any supported [aggregate operation
frame Array < Number > A frame specification as a two-element array indicating how the sliding window should proceed. The array entries should either be a number indicating the offset from the current data object, or null to indicate unbounded rows preceding or following the current data object. The default value is [null, 0], indicating that the sliding window includes the current object and all preceding objects. The value [-5, 5] indicates that the window should include five objects preceding and five objects following the current object. Finally, [null, null] indicates that the window frame should always include all data objects.
ignorePeers Boolean Indicates if the sliding window frame should ignore peer values. (Peer values are those considered identical by the compare criteria). The default is false, causing the window frame to expand to include all peer values. If set to true, the window frame will be defined by offset values only. This setting only affects those operations that depend on the window frame: aggregation operations and the first_value, last_value, and nth_value window operations.

Parameter Types

Parameter types for dataflow transforms:

# Array

An Array instance representing a collection of values.

# Boolean

A Boolean value. The values null and "" map to null. The strings "false" and "0" map to false. Any other values are subject to boolean coercion (!!value).

# Compare

A comparator is a function that takes two arguments a and b as input and compares them to determine a rank ordering, return a value less than zero if a < b, a value greater than zero if a > b, and zero if the two values are equivalent. Comparators can be specified in multiple ways:

  • A string indicating a field to order by, optionally annotated with a + or - prefix to indicate ascending or descending sort, respectively. If no prefix is supplied an ascending order is assumed. For example: "amount" (implicit ascending order), "+amount" (explicit ascending order), "-amount" (descending order).
  • An array of comparator strings specifying multi-field ordering criteria. For example: ["-amount", "+age"].
  • An object with fields and orders properties providing an array of data fields to order by and an array of corresponding orders ("ascending" or "descending"). For example: {fields: ["amount, age"], "orders: ["descending", "ascending"]}.
  • An explicit comparator expression. For example: df.expr((a, b) => (b.amount - a.amount) || (a.age - b.age)).fields(['amount', 'age']).

# Expression

An expression is a function that takes one or more data objects as arguments and returns a calculated value. Expressions are useful as filtering predicates and formula calculations, or to provide customized comparators.

Expressions should be constructed using the expr API: df.expr(datum => datum.x * datum.x + datum.y * datum.y).fields(['x', 'y']).

# Field

A field is a named data attribute (or in tabular terms, a data column). These fields correspond to possibly nested properties of a data object. Field references can be specified in multiple ways:

  • A convenience API method of the form df.field("field").as("name"), indicating the string field name (or an accessor function) and optional name alias for output.
  • A string indicating the field name. Nested fields are indicated using dot (.) or bracket ([]) notation. For example: "amount", "source.x", "target['x']". To specify field names that contain dots but are not nested lookups, escape the dot inline ("my\\.field") or enclose the field name in brackets ("[my.field]").
  • An object with a field property and optional as property. The field property should be either a string field name or an expression function. The optional as property specifies a name for the field, and can be used to specify the output names for a project transform or aggregate groupby. If as is not specified, a given field name string will be used.
  • An expression function that takes a data object as input and returns a value. This option can be used to create virtual fields that are actually derived values of the named properties of the object. For example: df.expr(d => Math.sqrt(d.amount)).fields(['amount']).as('sqrt_amount').

# Measure

A measure is a window or aggregate operation to apply across a collection of data values. Measures can be specified in multiple ways:

  • Convenience API methods for each window and aggregate operation. For example: df.sum('amount'), df.ntile(4).
  • An object specifying the operation. For example: {"op": "sum", "field": "amount"}, {"op": "ntile", "param": 4}. The supported object properties are:
    • op: the window or aggregate operation name. This property is required in all cases.
    • field: a data field reference. This property is required for all aggregate operations and for window operations that operate over data field values.
    • param: an operation parameter. Applicable only to a subset of window operations.
    • as: output field name. Optional property to specify the output field.

# Number

A Number value. The values null and "" map to null. Any other values are subject to number coercion (+value).

# RegExp

A RegExp value representing a well-formatted regular expression. The values null and "" map to null. A RegExp value is used as-is. Any other values are subject to string coercion (value+'') and then interpreted as properly-escaped regular expression strings.

# String

A String value. The values null and "" map to null. Any other values are subject to string coercion (value + '').

Aggregate Operations

Aggregate operations that can be used as entries of the measure parameter of the aggregate, joinaggregate, and window transforms. For each operation, the as method call is optional.

# df.count().as(name) <>

The total count of data objects in an aggregation group.

# df.valid(field).as(name) <>

The count of field values that are not null, undefined, or NaN.

# df.missing(field).as(name) <>

The count of null or undefined field values.

# df.distinct(field).as(name) <>

The count of distinct field values.

# df.sum(field).as(name) <>

The sum of field values.

# df.mean(field).as(name) <>

The mean (average) field value.

# df.average(field).as(name) <>

The mean (average) field value. Identical to mean.

# df.variance(field).as(name) <>

The sample variance of field values.

# df.variancep(field).as(name) <>

The population variance of field values.

# df.stdev(field).as(name) <>

The sample standard deviation of field values.

# df.stdevp(field).as(name) <>

The population standard deviation of field values.

# df.stderr(field).as(name) <>

The standard error of field values.

# df.median(field).as(name) <>

The median field value.

# df.q1(field).as(name) <>

The lower quartile boundary of field values.

# df.q3(field).as(name) <>

The upper quartile boundary of field values.

# df.ci0(field).as(name) <>

The lower boundary of the bootstrapped 95% confidence interval of the mean field value.

# df.ci1(field).as(name) <>

The upper boundary of the bootstrapped 95% confidence interval of the mean field value.

# df.min(field).as(name) <>

The minimum field value.

# df.max(field).as(name) <>

The maximum field value.

# df.argmin(field).as(name) <>

An input data object containing the minimum field value.

# df.argmax(field).as(name) <>

An input data object containing the maximum field value.

Window Operations

Window operations that can be used as entries of the measure parameter of the window transform. For each operation, the as method call is optional.

# df.row_number().as(name) <>

Assigns each data object a consecutive row number, starting from 1.

# df.rank().as(name) <>

Assigns a rank order value to each data object in a window, starting from 1. Peer values are assigned the same rank. Subsequent rank scores incorporate the number of prior values. For example, if the first two values tie for rank 1, the third value is assigned rank 3.

# df.dense_rank().as(name) <>

Assigns dense rank order values to each data object in a window, starting from 1. Peer values are assigned the same rank. Subsequent rank scores do not incorporate the number of prior values. For example, if the first two values tie for rank 1, the third value is assigned rank 2.

# df.percent_rank().as(name) <>

Assigns a percentage rank order value to each data object in a window. The percent is calculated as (rank - 1) / (group_size - 1).

# df.cume_dist().as(name) <>

Assigns a cumulative distribution value between 0 and 1 to each data object in a window.

# df.ntile(parameter).as(name) <>

Assigns a quantile (e.g., percentile) value to each data object in a window. Accepts an integer parameter indicating the number of buckets to use (e.g., 100 for percentiles, 5 for quintiles).

# df.lag(field[, parameter]).as(name) <>

Assigns the value of field from the data object that precedes the current object by a specified number of positions. If no such object exists, assigns null. Accepts an offset parameter (default 1) that indicates the number of positions.

# df.lead(field[, parameter]).as(name) <>

Assigns the value of field from the data object that follows the current object by a specified number of positions. If no such object exists, assigns null. Accepts an offset parameter (default 1) that indicates the number of positions.

# df.first_value(field).as(name) <>

Assigns the value of field from the first data object in the current sliding window frame.

# df.last_value(field).as(name) <>

Assigns the value of field from the last data object in the current sliding window frame.

# df.nth_value(field[, parameter]).as(name) <>

Assigns the value of field from the nth data object in the current sliding window frame. If no such object exists, assigns null. Requires a non-negative integer parameter that indicates the offset from the start of the window frame.

Package Sidebar

Install

npm i dataflow-api

Weekly Downloads

4

Version

0.1.0

License

BSD-3-Clause

Unpacked Size

627 kB

Total Files

33

Last publish

Collaborators

  • jheer