rx-extra
TypeScript icon, indicating that this package has built-in type declarations

2.1.4 • Public • Published

rx-extra

rx-extra extends the RxJS 5 library with extra methods like fromNodeReadableStream and splitOnChange. If you're still using RxJS 4, you can find the compatible version of rx-extra in the RxJS4 branch.

Install

npm install --save rx-extra

Methods

fromNodeReadableStream

import { Observable } from 'rxjs/Observable';
import 'rx-extra/add/observable/fromNodeReadableStream';
import 'rxjs/add/operator/map';
 
Observable.fromNodeReadableStream(myStream)
  .map(x => objectMode ? x : x.toString())
  .subscribe(console.log, console.error);

partitionNested

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rx-extra/add/operator/partitionNested';
 
let partitioned = Observable.range(1, 20)
  .partitionNested(x => x % 2 === 0);
 
let [evenSource, oddSource] = partitioned;
 
let [multipleOfSixSource, notMultipleOfSixSource] = partitioned
  .partitionNested(x => x % 3 === 0);
 
multipleOfSixSource
  .subscribe(console.log, console.error);

splitOnChange

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/from';
import 'rx-extra/add/operator/splitOnChange';
 
Observable.from([{
  value: 5,
}, {
  value: 5,
}, {
  value: 6,
}, {
  value: 7,
}
}])
  .splitOnChange(function(item) {
    return item.value;
  })
  .subscribe(console.log, console.error);

then (Promise)

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rx-extra/add/operator/then';
 
Observable.range(1, 3)
  .then(console.log, console.error);

throughNodeStream

import * as JSONStream from 'JSONStream';
import { Observable } from 'rxjs/Observable';
import 'rx-extra/add/operator/throughNodeStream';
 
Observable.from([
  '[{"a": 1},',
  '{"a": 2},{"a":',
  '3},',
  '{"a": 4}]'
])
  .throughNodeStream(JSONStream.parse('..a'))
  .subscribe(console.log, console.error);

If the objectMode option for your transform stream is not true, you will need to handle any required conversion(s) between String|Buffer and Number|Boolean|Object, e.g.:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/operator/map';
import 'rx-extra/add/operator/throughNodeStream';
import * as through2 from 'through2';
 
Observable.range(1, 3)
  .map(x => x.toString())
  .throughNodeStream(through2(function(chunk, enc, callback) {
    var that = this;
    let x = parseInt(chunk.toString());
    for (var i=0; i<x; i++) {
      that.push(String(+ 1));
    }
    callback();
  }))
  .map(x => parseInt(x.toString()))
  .subscribe(console.log, console.error);

toNodeCallback

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rx-extra/add/operator/toNodeCallback';
 
Observable.range(1, 3)
  .toNodeCallback(function(err, result) {
    if (err) {
      throw err;
    }
    console.log(result);
  });

Readme

Keywords

Package Sidebar

Install

npm i rx-extra

Weekly Downloads

3

Version

2.1.4

License

MIT

Unpacked Size

162 kB

Total Files

81

Last publish

Collaborators

  • jacobwindsor
  • ariutta
  • alexanderpico
  • larsgw