Share your code.

`timestream`

is a suite of tools for working with `objectMode`

streams of time-ordered records, i.e. timeseries data.

**NOTE: THIS IS A WORK IN PROGRESS**

Consider this to be unstable, check back often for updates.

`var timestream = // generate a geometric series: var s = timestream // generate a random series: var r = timestream // sin(x) each record of stream s and union it with series r s `

On its own, this library provides a way to generate either geometrically increasing/decreasing data, or random data. You can also use stream-spigot to generate records.

The intent of the library is to be used in situations where you are provided an objectMode stream of time-sequential records, such as level-version

The library provides a wide variety of operations that can be done to timeseries data, from joining it by timestamp to other streams, doing rolling aggregates, performing transforms on each record, or filtering the streams.

See the API for a full list of operations, including some which let you simply provide your own transforms or filters.

All of the transforms and joins return a new Timestream object and are meant to be chained.

There are currently three provided means for getting data out of the timestream pipeline: `pipe`

`tail`

and `toArray`

-- Although the timestream object isn't actually a stream, it encapsulates a stream that it provides `pipe`

access to.

`timestream.pipe(stream [,options])`

Pipe the encapsulated stream to a `stream.Transform`

or `stream.Writable`

stream for downstream processing. This method does *not* return a Timestream and cannot be chained.

`timestream.tail(fn)`

Provide a function of the form `fn(record)`

that will be called asynchronously as records are completed in the pipeline. This method does *not* return a Timestream and cannot be chained.

`timestream.toArray(fn)`

Provide a function that will accept the fully realized transformed record set in a single Array of records. Function should be `fn(recordArray)`

and will be called once all input streams have ended and all transformations have occured. If your input streams never end, you may want to avoid using `toArray`

.

There are a few ways to generate timestreams via this library.

- gen
- rand
- one

`timestream.gen(options)`

Generate a geometric sequence with a single numeric record at each timestamp.

Options:

- start (required): A millisecond timestamp for the first record
- until (required): A millisecond timestamp for the maximum possible timestamp in this series
- interval (required): A number of milliseconds to increment each record's timestamp by
- key: A name for the value at each record. Default
`gen`

- initial: An initial value for the first record. Default 0
- increment: How much to increment the value by for each record. Default 1

`timestream.rand(options)`

Generate a random series with a `Math.random()`

value at each timestamp.

Options:

- start (required): A millisecond timestamp for the first record
- until (required): A millisecond timestamp for the maximum possible timestamp in this series
- interval (required): A number of milliseconds to increment each record's timestamp by
- key: A name for the value at each record. Default
`rand`

`timestream.one(timestamp [,record])`

Generate a single record at a single point in time. Default record is {gen: 1}, accepts any type of record.

Join operations combine two timestreams based on the timestamps. To join records the millisecond timestamps must be identical. All join operations are considered **left** side operations, that is when combining records, they will use the left values where matching records have keys that overlap.

**NOTE: You'll frequently/usually want to do an aggregation operation before joining to make sure the temestamps match.**

- union
- join
- intersect
- complement
- diff
- where

`timestream.union(otherTimestream)`

Perform a **left** union operation. Take all records from both sets. Combine overlapping records.

`timestream.join(otherTimestream)`

Perform a **left** join operation. Take all records from the left set, combined with any values from matching records in the right set.

`timestream.intersect(otherTimestream)`

Perform a **left** intersection. Take only records where both sets have matching timestamps.

`timestream.complement(otherTimestream)`

Perform a complement. Of the combined sets take only records that complement the left set, that is records on the right only that have no matching left record.

`timestream.diff(otherTimestream)`

Perform a symmetric difference. Keep only records where neither set overlaps the other.

`timestream.where(filterFn, otherTimestream)`

Performs a **left** join with a filter function. If your filter returns `true`

it will keep the **left** record, otherwise it will skip it. Filter function is `filterFn(leftRecord, rightRecord)`

where rightRecord could be null. The record **can** be mutated in your filter function.

Aggregation operations combine records of a single timestream based on regular time intervals. They effectively pivot each set of records over the keys and apply the function to each key over all records in a time window.

- sum
- count
- mean
- mode
- median
- percentile
- variance
- stdev
- min
- max
- first
- last
- sample

All aggregates accept an interval slice that it will partition the streams into. This can either be a raw number, or any of the intervals accepted by floordate:

- s, sec, secs, second, seconds
- m, min, mins, minute, minutes
- h, hr, hrs, hour, hours
- d, day, days
- w, wk, wks, week, weeks
- M, mon, mons, month, months
- q, qtr, qtrs, quarter, quarters
- y, yr, yrs, year, years

If no interval is specified, the operation is applied over every record resulting in a single record.

`timestream.sum([interval])`

Aggregate each time interval into a single record that is a sum of the records.

`timestream.count([interval])`

Aggregate each time interval into a single record that is a count of instances of keys accross the records.

`timestream.mean([interval])`

Average (mean) all records into a single record by time interval.

`timestream.mode([interval])`

Average (mode) all records into a single record by time interval.

`timestream.median([interval])`

Average (median) all records into a single record by time interval.

`timestream.percentile([interval,] percent)`

Determine the specified percentile of each record accross the interval into a single record.

`timestream.variance([interval])`

Calculate the statstical variance from the mean accross the interval into a single record.

`timestream.stdev([interval])`

Determine the standard deviation of each record from the mean of the records in the interval.

`timestream.min([interval])`

Aggregate each time interval into a single record that is the minimum value of each key accross the records.

`timestream.max([interval])`

Aggregate each time interval into a single record that is the maximum value of each key accross the records.

`timestream.first([interval])`

Take the first record in each time window.

`timestream.last([interval])`

Take the last record in each time window.

`timestream.sample([interval])`

Take a random record from each time window.

Filter a single timeseries keeping only records that satisfy the filter.

- range
- rtrim
- ltrim
- scrub
- filter

`timestream.range(start, end)`

Keep an (inclusive) time range from the timestream.

`timestream.rtrim(n)`

Keep only the latest N records from the **right** side of the timestream, e.g. the last N chronologically.

`timestream.ltrim(n)`

Keep only the first N records from the **left** side of the timestream, e.g. the first N chronologically.

`timestream.scrub()`

Remove records that are "empty", that is they have no data beyond the timestamp.

`timestream.filter(fn)`

Apply a filter function to each record, returning true if it is to be kept. Function should be `fn(record)`

and return true to keep the record, or false to discard it.

This set of transform operations operate on each record, and thus will forward the same number of records downstream, unlike the filters or aggregates.

- each
- ceil
- floor
- round
- abs
- log
- exp
- pow
- sqrt
- sin
- cos
- plus
- minus
- times
- divide
- elapsed
- dt
- cumsum
- sma
- keep
- into
- rename
- numbers
- flatten
- nest
- slide
- map

`each(fn)`

Apply `fn`

to each value in each record. Walks through each record calling `fn`

for each value, so `fn`

should accept a value and return what you would like the new value to be.

`ceil()`

Apply `Math.ceil`

to each numeric value in each record.

`floor()`

Apply `Math.floor`

to each numeric value in each record.

`round(factor)`

Round each numeric value in each record to the specified factor. E.g. if the factor is `10`

it will round to the tens place `333 -> 330`

.

`abs()`

Apply `Math.abs`

to each numeric value in each record.

`log()`

Apply `Math.log`

to each numeric value in each record.

`exp()`

Apply `Math.exp`

to each numeric value in each record.

`pow(factor)`

Apply `Math.pow(number, factor)`

to each numeric value in each record.

`sqrt()`

Apply `Math.sqrt`

to each numeric value in each record.

`sin()`

Apply `Math.sin`

to each numeric value in each record.

`cos()`

Apply `Math.cos`

to each numeric value in each record.

`plus(addend)`

Add the value `addend`

to each numeric value in each record.

`minus(addend)`

Subtract the value `addend`

from each numeric value in each record.

`times(factor)`

Multiply the value `factor`

by each numeric value in each record.

`divide(factor)`

Divide each numeric value in each record by the value `factor`

.

`elapsed()`

Insert a new key `elapsed`

in each record, which is the difference in time since the previous record in the timeseries.

`dt()`

For each numeric value in each record, replace the value with its difference from the previous value. This can be considered similar to a differential.

`cumsum()`

Replace each numeric value with the cumulative sum of all numeric values at that key prior to this record.

`sma(n)`

Replace each numeric value with the Simple Moving Average (mean) of that value for the previous `n`

records.

`keep(keys)`

Keep only the keys specified by the array `keys`

in each record.

`into(path [,name])`

Replace the record with a new record which is at the key or key path specified by `path`

and optionally rename the key to `name`

. Use this to convert timeseries with partitioned or nested data into specific portions of each record only. `path`

accepts js dot notation, e.g. `into("v", "foo.bar[2]")`

would find in each record a property named `foo`

, in each of those objects a property named `bar`

which stores an array, then from that array take the 3rd element only.

`rename(from, to)`

Rename the key `from`

to the name `to`

at each record.

`numbers()`

Remove all non-numeric values from each record.

`flatten()`

Flatten the record (using flatnest) into a record with no nested structures, preserving content.

E.g.

`[ {_t: 0, abc: {def: ["v0", "v0.1"]}, zyx: ["aa", "ab"]}, {_t: 1, abc: {def: ["v1", "v1.1"]}, zyx: ["ba", "bb"]}, {_t: 2, abc: {def: ["v2", "v2.1"]}, zyx: ["ca", "cb"]}, {_t: 3, abc: {def: ["v3", "v3.1"]}, zyx: ["da", "db"]}, {_t: 4, abc: {def: ["v4", "v4.1"]}, zyx: ["ea", "eb"]}, {_t: 5, abc: {def: ["v5", "v5.1"]}, zyx: ["fa", "fb"]}, {_t: 6},]`

Becomes:

`[ {"_t":0,"abc.def[0]":"v0","abc.def[1]":"v0.1","zyx[0]":"aa","zyx[1]":"ab"}, {"_t":1,"abc.def[0]":"v1","abc.def[1]":"v1.1","zyx[0]":"ba","zyx[1]":"bb"}, {"_t":2,"abc.def[0]":"v2","abc.def[1]":"v2.1","zyx[0]":"ca","zyx[1]":"cb"}, {"_t":3,"abc.def[0]":"v3","abc.def[1]":"v3.1","zyx[0]":"da","zyx[1]":"db"}, {"_t":4,"abc.def[0]":"v4","abc.def[1]":"v4.1","zyx[0]":"ea","zyx[1]":"eb"}, {"_t":5,"abc.def[0]":"v5","abc.def[1]":"v5.1","zyx[0]":"fa","zyx[1]":"fb"}, {"_t":6}] `

`nest()`

Nest the record (using flatnest) into a nested structure based on the key names. Typically used to undo a `flatten()`

operation.

`slide(value)`

Add `value`

to each record's timestamp, effectively sliding it in time.

`map(fn)`

Do it yourself! Full control of each record, using through2-map. Provide a function that accepts a record, and return a new record to send downstream.

MIT