@ezs/core

3.8.3 • Public • Published

core

Ce plugin propose une série d'instructions natives. Elles sont constamment disponibles car chargées automatiquement.

installation

npm install @ezs/core

détails

Plusieurs instructions permettent de créer des sous flux (sub pipeline), à partir d'un fichier d’instructions ou d'instructions imbriquées. Si elles s'utilisent toutes de la même manière (avec les mêmes paramètres) centaines peuvent apparaître comme similaires mais leur fonctionnement est différent :

  • [delegate] : 1 sous flux pour tous les éléments
  • [swing] : 1 sous flux pour tous les éléments filtrés selon une condition
  • [spaw] : 1 sous flux par élément
  • [loop] : 1 sous flux par élément
  • [expand] : 1 sous flux pour N éléments (N = size), seul le champ sélectionné est envoyé dans le pipeline
  • [combine] : 1 sous flux pour tous les éléments, seul le champ sélectionné est comparé avec le résultat du sous flux
  • [singleton] : 1 sous flux pour le premier élément

usage

Table of Contents

assign

Add a new field to an Object.

Input file:

[{
   "a": 1,
},
{
   "a": 2,
},
{
   "a": 3,
},
{
   "a": 4,
},
{
   "a": 5,
}]

Script:

[assign]
path = b.c
value = 'X'

Output:

[{
   "a": 1,
   "b": { "c": "X" },
},
{
   "a": 2,
   "b": { "c": "X" },
},
{
   "a": 3,
   "b": { "c": "X" },
},
{
   "a": 4,
   "b": { "c": "X" },
},
{
   "a": 5,
   "b": { "c": "X" },
}]

Parameters

  • path String? path of the new field
  • value String? value of the new field

Returns Object

combine

Takes an Object and substitute a field with the corresponding value found in a external pipeline the internal pipeline must produce a stream of special object (id, value)

[
          { year: 2000, dept: 54 },
          { year: 2001, dept: 55 },
          { year: 2003, dept: 54 },
]

Script:

[use]
plugin = analytics

[combine]
path = dept
file = ./departement.ini

Output:

 [
          { year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
          { year: 2001, dept: { id: 55, value: 'Meuse' } },
          { year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
 ]

Parameters

  • path String? the path to substitute
  • default String? value if no substitution (otherwise value stay unchanged)
  • primer String Data to send to the external pipeline (optional, default n/a)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors
  • cacheName String? Enable cache, with dedicated name

Returns Object

concat

Take all String, concat them and throw just one.

[
     "a",
     "b",
     "c"
]

Script:

[concat]
beginWith = <
joinWith = |
endWith = >

Output:

[
     "<a|b|c>"
]

Parameters

  • beginWith String? Add value at the begin
  • joinWith String? use value to join 2 chunk
  • endWith String? Add value at the end

Returns String

debug

Take Object, print it (with its number), and throw the same object.

with ezs debug enabled: every object will be stringify for printed and all others ezs debug traces will be print

with ezs debug disabled: every objects will be inspected (indented and colorized) and print on stderr (error level) or stdout (log level)

if ezs parameter is set, every object are not log (it's a global action)

Parameters

  • level String console level : log or error or silent (optional, default error)
  • text String text before the dump (optional, default valueOf)
  • path String? path of field to print
  • ezs Boolean? enable or disable ezs global debug traces

Returns Object

dedupe

Take Object, and check that the object identifier has not already been used previously

Parameters

  • data
  • feed
  • path String path containing the object Identifier (optional, default uri)
  • ignore Boolean Just ignore duplicate object (optional, default false)

Returns Object

delegate

Delegate processing to an external pipeline.

Note: works like spawn, but each chunk share the same external pipeline.

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

dispatch

Dispatch processing to an external pipeline on one or more servers.

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command

Returns Object

dump

Take all Objects and generate a JSON array

[
    { "a": 1 },
    { "a": 2 },
    { "a": 3 },
    { "a": 4 },
    { "a": 5 }
]

Script:

[dump]
indent = true

Output:

 [{
   "a": 1
  },
  {
   "a": 2
  },
  {
   "a": 3
  },
  {
   "a": 4
  },
  {
   "a": 5
  }
]

Parameters

  • indent boolean indent JSON (optional, default false)

Returns String

env

Send the input object again, while adding new environment field(s) with the first Object of the feed.

Parameters

  • path String? path of the new field
  • value String? value of the new field

Returns Object

exchange

Take Object and throw a new item computed by the value= parameter (which replace the input one).

Input file:

[{
   "a": "abcdefg",
   "b": "1234567",
   "c": "XXXXXXX"
},
{
   "a": "abcdefg",
   "b": "1234567",
   "c": "XXXXXXX"
}]

Script:

[exchange]
value = omit('c')

Output:

[{
   "a": "abcdefg",
   "b": "1234567"
},
{
   "a": "abcdefg",
   "b": "1234567"
}]

Parameters

  • value String? value to replace input object

Returns Object

expand

Takes an Object and substitute a field with the corresponding value found in a external pipeline the internal pipeline receive a special object { id, value } id is the item identifier & value is the item path value The internal pipeline can expand value with another

[
          { year: 2000, dept: 54 },
          { year: 2001, dept: 55 },
          { year: 2003, dept: 54 },
]

Script:

[use]
plugin = analytics

[expand]
path = dept
file = ./departement.ini

Output:

 [
          { year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
          { year: 2001, dept: { id: 55, value: 'Meuse' } },
          { year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
 ]

Parameters

  • path String? the path to substitute
  • size Number How many chunk for sending to the external pipeline (optional, default 1)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors
  • cacheName String? Enable cache, with dedicated name
  • token String? add token values in the subpipeline (optional)

Returns Object

extract

Take Object and throw each value of fields

Note: extract cannot throw undefined or null values

[{
   "a": "abcdefg",
   "b": "1234567",
   "c": "XXXXXXX"
},
{
   "a": "abcdefg",
   "b": "1234567",
   "c": "XXXXXXX"
}]

Script:

[extract]
path = a
path = b

Output:

[
   "abcdefg",
   "1234567",
   "abcdefg",
   "1234567"
}]

Parameters

  • path String? path of field to extract

Returns Object

fork

fork the current pipeline

Note: but each chunk is sent to the same external pipeline.

Parameters

  • standalone Boolean The current pipeline will be able to end without waiting for the end of the external pipeline (optional, default false)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

group

Take all chunks, and throw them grouped by length.

See also ungroup.

[
     "a",
     "b",
     "c",
     "d",
     "e",
     "f",
     "g",
     "h"
]

Script:

[group]
length = 3

Output:

[
     [ "a", "b", "c" ],
     [ "d", "e", "f" ],
     [ "g", "h" ]
]

Parameters

  • length Number? Size of each partition

Returns String

identify

Take Object, and compute & add an identifier

Parameters

  • data
  • feed
  • scheme String scheme to use (uid or sha) (optional, default uid)
  • path String path containing the object Identifier (optional, default uri)

Returns String

ignore

Takes all the chunks, and ignore the firtst N chunk

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[ignore]
length = 3

Output:

[{
   "a": 4
},
{
   "a": 5
}]

Parameters

  • length Number? Length of the feed to ignore

Returns any

keep

Throw input Object but keep only specific fields.

Input file:

[{
   "a": "abcdefg",
   "b": "1234567",
   "c": "XXXXXXX"
},
{
   "a": "abcdefg",
   "b": "1234567",
   "c": "XXXXXXX"
}]

Script:

[keep]
path = a
path = b

Output:

[{
   "a": "abcdefg",
   "b": "1234567"
},
{
   "a": "abcdefg",
   "b": "1234567"
}]

Parameters

  • path String? path of field to keep

Returns Object

loop

Loop on external pipeline, until test will be true

Note: works like delegate, but each chunk use its own external pipeline

Parameters

  • test String? if test is true
  • reverse Boolean to reverse the test (optional, default false)
  • maxDepth Number to limit the number of loops (optional, default 100000)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in an object
  • command String? the external pipeline is described in an URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

map

From an array field delegate processing of each items to an external pipeline

Note: works like delegate, but each chunk use its own external pipeline

Parameters

  • path String? the path to substitute
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in an object
  • command String? the external pipeline is described in an URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

metrics

  • **See: ../server/knownPipeline.js **

Take Object, and throw the same object.

This statement will only be used if :

  • EZS_METRICS is enabled
  • ezs is running in server mode

WARNING: avoid setting bucket to "input" or "output", as these labels are used by ezs. If you do, you risk distorting the associated metrics.

Parameters

  • pathName String to identify the script (optional, default auto)
  • bucket String to identify the moment of measurement (optional, default unknow)

Returns Object

overturn

Takes an Object and substitute twice a field with the corresponding value found in a external pipeline the internal pipeline receive a special object { id, value, token } :

  • id is the item identifier
  • value is the item path value,
  • token is an array containing stream id and an number (0 for first time, 1 for the second tme The internal pipeline can overturn value with another.

It's work like [expand] but the second call starts only when all the values of the stream have been sent once

[
          { year: 2000, dept: 'Meuse' },
          { year: 2001, dept: 'Moselle' },
          { year: 2003, dept: 'Vosges'},
]

Script #1:

[overturn]
path = dept

[overturn/assign]
path = value
value = get('value').split('').reverse().join('')

Output:

 [
          { year: 2000, dept: 'Meuse' },
          { year: 2001, dept: 'Moselle' },
          { year: 2003, dept: 'Vosges' },
 ]

Script #2:

[overturn]
path = dept

[overturn/drop]
  path = token.1
  if = 0

[overturn/assign]
path = value
value = get('value').split('').reverse().join('')

Output:

 [
          { year: 2000, dept: 'esueM' },
          { year: 2001, dept: 'ellesoM' },
          { year: 2003, dept: 'segsoV' },
 ]

Parameters

  • path String? the path to overturn
  • size Number How many chunk for sending to the external pipeline (optional, default 1)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command

Returns Object

pack

Take all Object, throw encoded String

Returns String

parallel

Takes an Object delegate processing to X internal pipelines

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

pop

  • **See: shift **

Return the last Object and close the feed

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[shift]

Output:

[{
   "a": 5
}]

Returns Object

remove

Take Object and remove it from the feed if test is true Input file:

[{
   a: "a"
},
{
   a: 2
},
{
   a: "b"
},
{
   a: 4
},
{
   a: "c"
}]

Script:

[remove]
test = get('a).isInteger()
reverse = true

Output:

[
    {
       a: 2
    },
    {
       a: 4
    }
]

Parameters

  • test String? if test is true
  • reverse String reverse the test (optional, default false)

Returns Object

replace

Take Object and replace it with a new object with some fields.

See also exchange and assign.

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[replace]
path = b.c
value = 'X'

Output:

[{
   "b": { "c": "X" }
},
{
   "b": { "c": "X" }
},
{
   "b": { "c": "X" }
},
{
   "b": { "c": "X" }
},
{
   "b": { "c": "X" }
}]

Parameters

  • path String? path of the new field
  • value String? value of the new field

Returns Object

shift

Return the first Object and close the feed

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[shift]

Output:

[{
   "a": 1
}]

Returns Object

shuffle

Take Object, shuffle data of the whole object or only some fields specified by path

Input file:

[{
   "a": "abcdefg",
   "b": "1234567"
},
{
   "a": "abcdefg",
   "b": "1234567"
}]

Script:

[shuffle]
path = a

Output:

[{
   "a": "cadbefg",
   "b": "1234567"
},
{
   "a": "dcaegbf",
   "b": "1234567"
}]

Parameters

  • path String? path of field to shuffle

Returns Object

singleton

Takes only the first Object delegate processing to a external pipeline

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

spawn

Delegate processing to an external pipeline, throw each chunk from the result.

Note: works like delegate, but each chunk use its own external pipeline

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in an object
  • command String? the external pipeline is described in an URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors
  • cache String? Use a specific ezs statement to run commands (advanced)

Returns Object

swing

Delegate processing to an external pipeline under specifics conditions

Note: works like spawn, but each chunk shares the same external pipeline.

Parameters

  • test String? if test is true
  • reverse String reverse the test (optional, default false)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in an object
  • command String? the external pipeline is described in an URL-like
  • logger String? A dedicaded pipeline described in a file to trap or log errors command

Returns Object

throttle

Take Object and return the same object

[{
         { id: 'x', value: 2 },
         { id: 't', value: 2 },
}]

Script:

[use]
plugin = analytics

[throttle]
bySecond = 2

Output:

[
         { id: 'x', value: 2 },
         { id: 't', value: 2 },
]

Parameters

  • bySecond Number Number of object by second (optional, default 1)

Returns Object

time

Measure the execution time of a script, on each chunk of input.

Parameters

Examples

Input

[1]

Program

const script = `
[transit]
`;
from([1])
    .pipe(ezs('time', { script }))

Output

[{
  data: 1,
  time: 15 // milliseconds
}]

Returns object

tracer

Take Object, print a character and throw the same object. Useful to see the progress in the stream.

Parameters

  • print String character to print at each object (optional, default .)
  • last String character to print at last call (optional, default .)
  • first String character to print at first call (optional, default .)

Returns Object

transit

Take Object and throw the same object again.

Input file:

[{
   "a": 1
},
{
   "a": 2
}]

Script:

[transit]

Output:

[{
   "a": 1
},
{
   "a": 2
}]

Returns Object

truncate

Takes all the chunks, and closes the feed when the total length is equal to the parameter.

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[truncate]
length = 3

Output:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
}]

Parameters

  • length Number? Length of the feed

Returns any

ungroup

Take all chunks, and throw one item for every chunk.

See also group.

[
     [ "a", "b", "c" ],
     [ "d", "e", "f" ],
     [ "g", "h" ]
]

Script:

[ungroup]

Output:

[
     "a",
     "b",
     "c",
     "d",
     "e",
     "f",
     "g",
     "h"
]

Returns Array<any>

unpack

Take Strings or Buffers and throw Object builded by JSON.parse on each line.

Returns object

use

Take all String, concat them and throw just one.

Script:

[use]
plugin = basics
plugin = analytics

Parameters

  • beginWith String? Add value at the begin
  • joinWith String? use value to join 2 chunk
  • endWith String? Add value at the end

Returns String

validate

From an Object, throw the same object if all rules pass

See

Input file:

[{
   "a": 1,
   "b": "titi"
},
{
   "a": 2,
   "b": "toto"
},
{
   "a": false
},
]

Script:

[validate]
path = a
rule = required|number

path = a
rule = required|string

Output:

[{
   "a": 1,
   "b": "titi"
},
{
   "a": 2,
   "b": "toto"
}]

Parameters

  • path String? path of the field
  • rule String? rule to validate the field

Returns Object

Readme

Keywords

Package Sidebar

Install

npm i @ezs/core

Weekly Downloads

21

Version

3.8.3

License

MIT

Unpacked Size

227 kB

Total Files

73

Last publish

Collaborators

  • parmentf
  • touv
  • jj618