node package manager

@yaga/etl

Yaga - ETL

ETL stream library for geo-spatial purposes. It is written in TypeScript.

How to install

npm install @yaga/etl

Available Streams

Spatialite

Reader and Writer stream for spatialite

Reader example

TypeScript:

import { Database } from 'spatialite';
import { SpatialiteReadStream } from '@yaga/etl';
 
const db: Database = new Database(':memory:');
db.spatialite((err: Error): void => {
    if (err) {
        // Error handling...
        return;
    }
    
    const stream = SpatialiteReadStream({
        db,
        query: 'SELECT * FORM table_name;' // You are able to make spatial queries
    });
    
    stream.on('error', (err: Error): void => {
        // Error handling...
    });
    stream.on('data', (row: any): void => {
        // receive each row as object
    });
});
 

JavaScript:

const spatialite = require('spatialite');
const etl = require('@yaga/etl');
 
const db = new spatialite.Database(':memory:');
db.spatialite(function (err) {
    if (err) {
        // Error handling... 
        return;
    }
    
    const stream = SpatialiteReadStream({
        db: db,
        query: 'SELECT * FORM table_name;' // You are able to make spatial queries 
    });
    
    stream.on('error', function (err) {
        // Error handling... 
    });
    stream.on('data', function (row) {
        // receive each row as object 
    });
});
 
 

Writer example

TypeScript:

import { Database } from 'spatialite';
import { SpatialiteWriteStream } from '@yaga/etl';
 
const db: Database = new Database(':memory:');
db.spatialite((err: Error): void => {
    if (err) {
        // Error handling...
        return;
    }
    
    const stream = SpatialiteWriteStream({
        db,
        fallbackSrid: '4326',
        mapping: {
            geometry: ['geometry', 'geometry'],
            int: ['int', 'int'],
            num: ['num', 'num'],
            real: ['real', 'real'],
            text: ['text', 'text']
        },
        table: 'test-table'
    });
    
    stream.write({
        geometry: 'SRID=3857;POINT(1 1)',
        int: 5,
        num: 5.432,
        real: 9.87654321,
        text: 'EWKT'
    });
    stream.write({
        geometry: 'SRID=4326;POINT(1 1)',
        int: 5,
        num: 5.432,
        real: 9.87654321,
        text: 'EWKT'
    });
    stream.write({
        geometry: 'POINT(1 1)', // Fallback to EPSG 4326, like defined in constructor call.
        int: 5,
        num: 5.432,
        real: 9.87654321,
        text: 'EWKT'
    });
    // Also (stringify and pared) GeoJSON as geometry is supported including crs.
    
    stream.end(); 
});
 

JavaScript:

const spatialite = require('spatialite');
const etl = require('@yaga/etl');
 
const db = new spatialite.Database(':memory:');
db.spatialite(function (err) {
    if (err) {
        // Error handling... 
        return;
    }
    
    const stream = SpatialiteWriteStream({
        db: db,
        fallbackSrid: '4326',
        mapping: {
            geometry: ['geometry', 'geometry'],
            int: ['int', 'int'],
            num: ['num', 'num'],
            real: ['real', 'real'],
            text: ['text', 'text']
        },
        table: 'test-table'
    });
    
    stream.write({
        geometry: 'SRID=3857;POINT(1 1)',
        int: 5,
        num: 5.432,
        real: 9.87654321,
        text: 'EWKT'
    });
    stream.write({
        geometry: 'SRID=4326;POINT(1 1)',
        int: 5,
        num: 5.432,
        real: 9.87654321,
        text: 'EWKT'
    });
    stream.write({
        geometry: 'POINT(1 1)', // Fallback to EPSG 4326, like defined in constructor call. 
        int: 5,
        num: 5.432,
        real: 9.87654321,
        text: 'EWKT'
    });
    // Also (stringify and pared) GeoJSON as geometry is supported including crs. 
    
    stream.end(); 
});
 

Postgres

Reader and Writer stream for postgres / postgis databases

Reader example

TypeScript:

import { Pool } from 'pg';
import { PostgresReadStream } from '@yaga/etl';
 
const db: Pool = new Pool({
    host: 'localhost',
    database: 'yaga',
    idleTimeoutMillis: 30000,
    max: 10,
    password: 'yaga',
    port: 5432,
    user: 'yaga'
});
const stream = PostgresReadStream({
        db,
        query: 'SELECT * FORM table_name;' // You are able to make spatial queries
    });
    
    stream.on('error', (err: Error): void => {
        // Error handling...
    });
    stream.on('data', (row: any): void => {
        // receive each row as object
});
 

JavaScript:

const pg = require('pg');
const etl = require('@yaga/etl');
 
const db = new pg.Pool({
    host: 'localhost',
    database: 'yaga',
    idleTimeoutMillis: 30000,
    max: 10,
    password: 'yaga',
    port: 5432,
    user: 'yaga'
});
const stream = PostgresReadStream({
    db: db,
    query: 'SELECT * FORM table_name;' // You are able to make spatial queries 
});
 
stream.on('error', function (err) {
    // Error handling... 
});
stream.on('data', function (row) {
    // receive each row as object 
})
 
 

Writer example

TypeScript:

import { Pool } from 'pg';
import { PostgresWriteStream } from '@yaga/etl';
 
const db: Pool = new Pool({
    host: 'localhost',
    database: 'yaga',
    idleTimeoutMillis: 30000,
    max: 10,
    password: 'yaga',
    port: 5432,
    user: 'yaga'
});
const stream = PostgresWriteStream({
    db,
    fallbackSrid: '4326',
    mapping: {
        geometry: ['geometry', 'geometry'],
        int: ['int', 'int'],
        num: ['num', 'num'],
        real: ['real', 'real'],
        text: ['text', 'text']
    },
    table: 'test-table'
});
 
stream.write({
    geometry: 'SRID=3857;POINT(1 1)',
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
stream.write({
    geometry: 'SRID=4326;POINT(1 1)',
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
stream.write({
    geometry: 'POINT(1 1)', // Fallback to EPSG 4326, like defined in constructor call.
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
// Also (stringify and pared) GeoJSON as geometry is supported including crs.
 
stream.end(); 
 

JavaScript:

const pg = require('pg');
const etl = require('@yaga/etl');
 
const db = new pg.Pool({
    host: 'localhost',
    database: 'yaga',
    idleTimeoutMillis: 30000,
    max: 10,
    password: 'yaga',
    port: 5432,
    user: 'yaga'
});
const stream = PostgresWriteStream({
    db: db,
    fallbackSrid: '4326',
    mapping: {
        geometry: ['geometry', 'geometry'],
        int: ['int', 'int'],
        num: ['num', 'num'],
        real: ['real', 'real'],
        text: ['text', 'text']
    },
    table: 'test-table'
});
 
stream.write({
    geometry: 'SRID=3857;POINT(1 1)',
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
stream.write({
    geometry: 'SRID=4326;POINT(1 1)',
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
stream.write({
    geometry: 'POINT(1 1)', // Fallback to EPSG 4326, like defined in constructor call. 
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
// Also (stringify and pared) GeoJSON as geometry is supported including crs. 
 
stream.end(); 
 

Solr

Reader and Writer stream for solr index

Reader example

TypeScript:

import { Client, createClient } from 'solr-client';
import { SolrReadStream } from '@yaga/etl';
 
const db: Client = createClient({
    core: 'yaga-etl',
    host: 'localhost',
    path: '/solr',
    port: 8983,
    solrVersion: '5.1'
});
const stream = SolrReadStream({
    db,
    query: '*:*'
});
 
stream.on('error', (err: Error): void => {
    // Error handling...
});
stream.on('data', (row: any): void => {
    // receive each row as object
});
 

JavaScript:

const spatialite = require('spatialite');
const etl = require('@yaga/etl');
 
const db = new spatialite.Database(':memory:');
const stream = SolrReadStream({
    db: db,
    query: '*:*'
});
 
stream.on('error', function (err) {
    // Error handling... 
});
stream.on('data', function (row) {
    // receive each row as object 
});
 
 

Writer example

TypeScript:

import { Client, createClient } from 'solr-client';
import { SolrWriteStream } from '@yaga/etl';
 
const db: Client = createClient({
    core: 'yaga-etl',
    host: 'localhost',
    path: '/solr',
    port: 8983,
    solrVersion: '5.1'
});
const stream = SolrWriteStream({
    db,
    fallbackSrid: '4326',
    mapping: {
        geometry: ['geometry', 'geometry'],
        int: ['int', 'int'],
        num: ['num', 'num'],
        real: ['real', 'real'],
        text: ['text', 'text']
    },
    table: 'test-table'
});
 
stream.write({
    geometry: 'SRID=3857;POINT(1 1)',
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
stream.write({
    geometry: 'SRID=4326;POINT(1 1)',
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
stream.write({
    geometry: 'POINT(1 1)', // Fallback to EPSG 4326, like defined in constructor call.
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
// Also (stringify and pared) GeoJSON as geometry is supported including crs.
 
stream.end(); 
 

JavaScript:

const solr = require('solr-client');
const etl = require('@yaga/etl');
 
const db = createClient({
    core: 'yaga-etl',
    host: 'localhost',
    path: '/solr',
    port: 8983,
    solrVersion: '5.1'
});
const stream = SolrWriteStream({
    db: db,
    fallbackSrid: '4326',
    mapping: {
        geometry: ['geometry', 'geometry'],
        int: ['int', 'int'],
        num: ['num', 'num'],
        real: ['real', 'real'],
        text: ['text', 'text']
    },
    table: 'test-table'
});
 
stream.write({
    geometry: 'SRID=3857;POINT(1 1)',
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
stream.write({
    geometry: 'SRID=4326;POINT(1 1)',
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
stream.write({
    geometry: 'POINT(1 1)', // Fallback to EPSG 4326, like defined in constructor call. 
    int: 5,
    num: 5.432,
    real: 9.87654321,
    text: 'EWKT'
});
// Also (stringify and pared) GeoJSON as geometry is supported including crs. 
 
stream.end(); 
 

JSONTransform

Transform stream for any kind of JSON

TypeScript:

import { JSONTransformStream } from '@yaga/etl';
 
const stream: JSONTransformStream = new JSONTransformStream({
    fn: (data: any) => {
        data.test = true;
        return data;
    }
});
 
stream.on('data', (chunk: any) => {
    console.log(chunk, 'is comparable to', {any: 'data', test: true});
});
 
 
stream.write({
    any: 'data'
});
 
stream.end(); 
 

JavaScript:

const etl = require('@yaga/etl');
 
const stream = new etl.JSONTransformStream({
    fn: function (data) {
        data.test = true;
        return data;
    }
});
 
stream.on('data', function (chunk) {
    console.log(chunk, 'is comparable to', {any: 'data', test: true});
});
 
 
stream.write({
    any: 'data'
});
 
stream.end(); 
 

Known Issues

  • The package epsg-to-proj has a JSON file as main-entrypoint. That is not supported by TypeScript. A workaround is to change the main entrypoint in the package.json to "main": "index.js" and create a simple index.js that just exports and requires the json file.