Ce plugin propose une série d'instructions natives. Elles sont constamment disponibles car chargées automatiquement.
npm install @ezs/core
Plusieurs instructions permettent de créer des sous flux (sub pipeline), à partir d'un fichier d’instructions ou d'instructions imbriquées. Si elles s'utilisent toutes de la même manière (avec les mêmes paramètres) centaines peuvent apparaître comme similaires mais leur fonctionnement est différent :
- [delegate] : 1 sous flux pour tous les éléments
- [swing] : 1 sous flux pour tous les éléments filtrés selon une condition
- [spaw] : 1 sous flux par élément
- [loop] : 1 sous flux par élément
- [expand] : 1 sous flux pour N éléments (N = size), seul le champ sélectionné est envoyé dans le pipeline
- [combine] : 1 sous flux pour tous les éléments, seul le champ sélectionné est comparé avec le résultat du sous flux
- [singleton] : 1 sous flux pour le premier élément
- assign
- combine
- concat
- debug
- dedupe
- delegate
- dispatch
- dump
- env
- exchange
- expand
- extract
- fork
- group
- identify
- ignore
- keep
- loop
- map
- metrics
- overturn
- pack
- parallel
- pop
- remove
- replace
- shift
- shuffle
- singleton
- spawn
- swing
- throttle
- time
- tracer
- transit
- truncate
- ungroup
- unpack
- use
- validate
Add a new field to an Object
.
Input file:
[{
"a": 1,
},
{
"a": 2,
},
{
"a": 3,
},
{
"a": 4,
},
{
"a": 5,
}]
Script:
[assign]
path = b.c
value = 'X'
Output:
[{
"a": 1,
"b": { "c": "X" },
},
{
"a": 2,
"b": { "c": "X" },
},
{
"a": 3,
"b": { "c": "X" },
},
{
"a": 4,
"b": { "c": "X" },
},
{
"a": 5,
"b": { "c": "X" },
}]
Returns Object
Takes an Object
and substitute a field with the corresponding value found in a external pipeline
the internal pipeline must produce a stream of special object (id, value)
[
{ year: 2000, dept: 54 },
{ year: 2001, dept: 55 },
{ year: 2003, dept: 54 },
]
Script:
[use]
plugin = analytics
[combine]
path = dept
file = ./departement.ini
Output:
[
{ year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
{ year: 2001, dept: { id: 55, value: 'Meuse' } },
{ year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
]
-
path
String? the path to substitute -
default
String? value if no substitution (otherwise value stay unchanged) -
primer
String Data to send to the external pipeline (optional, defaultn/a
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors -
cacheName
String? Enable cache, with dedicated name
Returns Object
Take all String
, concat them and throw just one.
[
"a",
"b",
"c"
]
Script:
[concat]
beginWith = <
joinWith = |
endWith = >
Output:
[
"<a|b|c>"
]
-
beginWith
String? Add value at the begin -
joinWith
String? use value to join 2 chunk -
endWith
String? Add value at the end
Returns String
Take Object
, print it (with its number), and throw the same object.
with ezs debug enabled: every object will be stringify for printed and all others ezs debug traces will be print
with ezs debug disabled: every objects will be inspected (indented and colorized) and print on stderr (error level) or stdout (log level)
if ezs parameter is set, every object are not log (it's a global action)
-
level
String console level : log or error or silent (optional, defaulterror
) -
text
String text before the dump (optional, defaultvalueOf
) -
path
String? path of field to print -
ezs
Boolean? enable or disable ezs global debug traces
Returns Object
Take Object
, and check that the object identifier has not already been used previously
data
feed
-
path
String path containing the object Identifier (optional, defaulturi
) -
ignore
Boolean Just ignore duplicate object (optional, defaultfalse
)
Returns Object
Delegate processing to an external pipeline.
Note: works like spawn, but each chunk share the same external pipeline.
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
Dispatch processing to an external pipeline on one or more servers.
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command
Returns Object
Take all Object
s and generate a JSON array
[
{ "a": 1 },
{ "a": 2 },
{ "a": 3 },
{ "a": 4 },
{ "a": 5 }
]
Script:
[dump]
indent = true
Output:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}
]
-
indent
boolean indent JSON (optional, defaultfalse
)
Returns String
Send the input object again, while adding new environment field(s) with the
first Object
of the feed.
Returns Object
Take Object
and throw a new item computed by the value=
parameter (which
replace the input one).
Input file:
[{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
},
{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
}]
Script:
[exchange]
value = omit('c')
Output:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
-
value
String? value to replace input object
Returns Object
Takes an Object
and substitute a field with the corresponding value found in a external pipeline
the internal pipeline receive a special object { id, value } id is the item identifier & value is the item path value
The internal pipeline can expand value with another
[
{ year: 2000, dept: 54 },
{ year: 2001, dept: 55 },
{ year: 2003, dept: 54 },
]
Script:
[use]
plugin = analytics
[expand]
path = dept
file = ./departement.ini
Output:
[
{ year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
{ year: 2001, dept: { id: 55, value: 'Meuse' } },
{ year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
]
-
path
String? the path to substitute -
size
Number How many chunk for sending to the external pipeline (optional, default1
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors -
cacheName
String? Enable cache, with dedicated name -
token
String? add token values in the subpipeline (optional)
Returns Object
Take Object
and throw each value of fields
Note: extract cannot throw
undefined
ornull
values
[{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
},
{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
}]
Script:
[extract]
path = a
path = b
Output:
[
"abcdefg",
"1234567",
"abcdefg",
"1234567"
}]
-
path
String? path of field to extract
Returns Object
fork the current pipeline
Note: but each chunk is sent to the same external pipeline.
-
standalone
Boolean The current pipeline will be able to end without waiting for the end of the external pipeline (optional, defaultfalse
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
Take all chunk
s, and throw them grouped by length
.
See also ungroup.
[
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h"
]
Script:
[group]
length = 3
Output:
[
[ "a", "b", "c" ],
[ "d", "e", "f" ],
[ "g", "h" ]
]
-
length
Number? Size of each partition
Returns String
Take Object
, and compute & add an identifier
data
feed
-
scheme
String scheme to use (uid or sha) (optional, defaultuid
) -
path
String path containing the object Identifier (optional, defaulturi
)
Returns String
Takes all the chunks, and ignore the firtst N chunk
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[ignore]
length = 3
Output:
[{
"a": 4
},
{
"a": 5
}]
-
length
Number? Length of the feed to ignore
Returns any
Throw input Object
but keep only specific fields.
Input file:
[{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
},
{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
}]
Script:
[keep]
path = a
path = b
Output:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
-
path
String? path of field to keep
Returns Object
Loop on external pipeline, until test will be true
Note: works like delegate, but each chunk use its own external pipeline
-
test
String? if test is true -
reverse
Boolean to reverse the test (optional, defaultfalse
) -
maxDepth
Number to limit the number of loops (optional, default100000
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in an object -
command
String? the external pipeline is described in an URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
From an array field delegate processing of each items to an external pipeline
Note: works like delegate, but each chunk use its own external pipeline
-
path
String? the path to substitute -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in an object -
command
String? the external pipeline is described in an URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
- **See: ../server/knownPipeline.js **
Take Object
, and throw the same object.
This statement will only be used if :
- EZS_METRICS is enabled
- ezs is running in server mode
WARNING: avoid setting bucket to "input" or "output", as these labels are used by ezs. If you do, you risk distorting the associated metrics.
-
pathName
String to identify the script (optional, defaultauto
) -
bucket
String to identify the moment of measurement (optional, defaultunknow
)
Returns Object
Takes an Object
and substitute twice a field with the corresponding value found in a external pipeline
the internal pipeline receive a special object { id, value, token } :
- id is the item identifier
- value is the item path value,
- token is an array containing stream id and an number (0 for first time, 1 for the second tme The internal pipeline can overturn value with another.
It's work like [expand] but the second call starts only when all the values of the stream have been sent once
[
{ year: 2000, dept: 'Meuse' },
{ year: 2001, dept: 'Moselle' },
{ year: 2003, dept: 'Vosges'},
]
Script #1:
[overturn]
path = dept
[overturn/assign]
path = value
value = get('value').split('').reverse().join('')
Output:
[
{ year: 2000, dept: 'Meuse' },
{ year: 2001, dept: 'Moselle' },
{ year: 2003, dept: 'Vosges' },
]
Script #2:
[overturn]
path = dept
[overturn/drop]
path = token.1
if = 0
[overturn/assign]
path = value
value = get('value').split('').reverse().join('')
Output:
[
{ year: 2000, dept: 'esueM' },
{ year: 2001, dept: 'ellesoM' },
{ year: 2003, dept: 'segsoV' },
]
-
path
String? the path to overturn -
size
Number How many chunk for sending to the external pipeline (optional, default1
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command
Returns Object
Take all Object
, throw encoded String
Returns String
Takes an Object
delegate processing to X internal pipelines
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
- **See: shift **
Return the last Object
and close the feed
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[shift]
Output:
[{
"a": 5
}]
Returns Object
Take Object
and remove it from the feed if test is true
Input file:
[{
a: "a"
},
{
a: 2
},
{
a: "b"
},
{
a: 4
},
{
a: "c"
}]
Script:
[remove]
test = get('a).isInteger()
reverse = true
Output:
[
{
a: 2
},
{
a: 4
}
]
Returns Object
Take Object
and replace it with a new object with some fields.
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[replace]
path = b.c
value = 'X'
Output:
[{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
}]
Returns Object
Return the first Object
and close the feed
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[shift]
Output:
[{
"a": 1
}]
Returns Object
Take Object
, shuffle data of the whole object or only some fields specified by path
Input file:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
Script:
[shuffle]
path = a
Output:
[{
"a": "cadbefg",
"b": "1234567"
},
{
"a": "dcaegbf",
"b": "1234567"
}]
-
path
String? path of field to shuffle
Returns Object
Takes only the first Object
delegate processing to a external pipeline
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
Delegate processing to an external pipeline, throw each chunk from the result.
Note: works like delegate, but each chunk use its own external pipeline
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in an object -
command
String? the external pipeline is described in an URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors -
cache
String? Use a specific ezs statement to run commands (advanced)
Returns Object
Delegate processing to an external pipeline under specifics conditions
Note: works like spawn, but each chunk shares the same external pipeline.
-
test
String? if test is true -
reverse
String reverse the test (optional, defaultfalse
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in an object -
command
String? the external pipeline is described in an URL-like -
logger
String? A dedicaded pipeline described in a file to trap or log errors command
Returns Object
Take Object
and return the same object
[{
{ id: 'x', value: 2 },
{ id: 't', value: 2 },
}]
Script:
[use]
plugin = analytics
[throttle]
bySecond = 2
Output:
[
{ id: 'x', value: 2 },
{ id: 't', value: 2 },
]
-
bySecond
Number Number of object by second (optional, default1
)
Returns Object
Measure the execution time of a script, on each chunk of input.
-
script
string?
Input
[1]
Program
const script = `
[transit]
`;
from([1])
.pipe(ezs('time', { script }))
Output
[{
data: 1,
time: 15 // milliseconds
}]
Returns object
Take Object
, print a character and throw the same object.
Useful to see the progress in the stream.
-
print
String character to print at each object (optional, default.
) -
last
String character to print at last call (optional, default.
) -
first
String character to print at first call (optional, default.
)
Returns Object
Take Object
and throw the same object again.
Input file:
[{
"a": 1
},
{
"a": 2
}]
Script:
[transit]
Output:
[{
"a": 1
},
{
"a": 2
}]
Returns Object
Takes all the chunks, and closes the feed when the total length is equal to the parameter.
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[truncate]
length = 3
Output:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
}]
-
length
Number? Length of the feed
Returns any
Take all chunk
s, and throw one item for every chunk.
See also group.
[
[ "a", "b", "c" ],
[ "d", "e", "f" ],
[ "g", "h" ]
]
Script:
[ungroup]
Output:
[
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h"
]
Returns Array<any>
Take String
s or Buffer
s and throw Object
builded by JSON.parse on each line.
Returns object
Take all String
, concat them and throw just one.
Script:
[use]
plugin = basics
plugin = analytics
-
beginWith
String? Add value at the begin -
joinWith
String? use value to join 2 chunk -
endWith
String? Add value at the end
Returns String
From an Object
, throw the same object if all rules pass
See
Input file:
[{
"a": 1,
"b": "titi"
},
{
"a": 2,
"b": "toto"
},
{
"a": false
},
]
Script:
[validate]
path = a
rule = required|number
path = a
rule = required|string
Output:
[{
"a": 1,
"b": "titi"
},
{
"a": 2,
"b": "toto"
}]
Returns Object