TypeScript icon, indicating that this package has built-in type declarations

1.0.0-beta.1 • Public • Published

task driver

Higher order factory for creating cycle.js async request/response drivers.

npm (scoped)

Notice that API of version 2.x has significantly changed since 1.x, which where using rxjs4 and has some excessive features that where removed in 2.x.

Currently in beta, so use:

npm install cycle-driver/task@beta -S

Allows you easily create fully functional cycle.js driver which side effect is executed using async function with promise or callback. It also can serve as a simple backbone for your more sophisticated driver.

Such driver will work in the same manner as official cycle HTTP driver, so basically:

  • driver sink will expect stream of requests
  • driver source will provide you with "metastream" of responses
  • driver source will provide standard isolate mechanics (@cycle/isolate)
  • driver will be compatible with any stream library that cycle.js can work with (it doesn't use any under the hood, but uses cyclejs stream adapter's API).



Async driver factory.

  • getResponse (Function): function that takes request object as first param and callback as second param returns any kind of Promise or uses passed node style callback to give back async response for passed request. required (if no getProgressiveResponse)

  • getProgressiveResponse (Function): function that takes request object as first param and observer object as second param which allows to create a custom response containing more then one value. required (if no getResponse)

  • normalizeRequest (Function): transform function that will be applied to the request before handling.

  • isolateMap (Function): transform function that will be applied to the request before its isolation, if not present normalizeRequest will be used instead.

  • lazy (Boolean): makes all driver requests lazy by default, can be overridden by particular request options. default value: false


Basic example

Let's create cycle.js driver which will be able to read files using node.js fs module:

import {makeAsyncDriver} from 'cycle-async-driver'
import fs from 'fs'
import {run} from '@cycle/rx-run' 
import {Observable as O} from 'rx'
let readFileDriver = makeAsyncDriver((request, callback) => {
  fs.readFile(requst.path, request.encoding || 'utf-8', callback)
  // instead of using `callback` param you may return Promise 


const Main = ({readFile}) => {
  return {
    readFile: O.of({path: '/path/to/file/to/read'}),
    output: read
      // select() is used to get all response$ streams
      // you may also filter responses by `category` field 
      // or by request filter function      
      // as we get metastream of responses - 
      // we should flatten it to get file data 

run(Main, {
  output: (ouput$) => ouput$.forEach(::console.log) 
  readFile: readFileDriver

Metastream of responses

makeAsyncDriver creates a driver function which accepts stream of requests (request$) and returns driver source which is an object that you use to access responses that come from the driver.

To access responses driver sources provides special selector method (select is default name) which takes nothing or string category (default name of selector property) and returns stream with all responses or filtered by request's category field.

Stream returned by select() is a metastream of responses. This means that each element of it is a stream itself, so it is a stream of streams and usually referred as response$$ (stream of response$). Each element of it is response$ stream that produces resulting values originated from particular request you send to driver sink (request$).

// to get plain response data you should flatten metastream of repsonses
  .select('something-special') // returns metastream of responses (response$$)
  .mergeAll() // gets flatten stream of repsonses data

Each response$ has attached property request (default name) which contains corresponding normalized request. In simple case this stream produces only one resulting value (actual response data). In case of progressive response it may produce multiple values before completion.

Also driver source has method filter witch takes filtering function for response$$ metastream and returns filtered driver source.

// is some cases you may want to get filtered source
  .filter(r$$ => r$$.request.method === 'DELETE') // returns filtered driver source
  .select() // gets all response$$ stream  

Each of response$ streams potentially may produce an error which should be properly handled. Metastream response$$ will produce an error only if request$ produces it and will end when request$ stream completes.


By default driver source will provider standard isolation strategy based on scoped namespaces. For this to each request object passed though isolated component boundaries isolation scope value is attached to it using special property _namespace (default name). Isolated driver source will automatically filter responses corresponding to requests belonging to isolation scope. So parent components have access to isolated child's responses.

Requests error handling

As it was said that select() method returns metastream of responses (response$$), which produces response streams (response$) each of which may produce an error if something goes wrong while performing request.

It was mentioned also that to for responses you need eventually to flatten metastream of responses. But notice that if you handle/catch an error on the flattened response$$ like that:

// rxjs
  .select() // responses$$ stream
  .mergeAll() // flatten stream of all plain responses
  .catch(error => of({error})) // replace the error
  // this stream will not have exception 
  // but will end right after first error caught

the stream will be completed and you won't get there anything after first error.

So usually for proper handling you need to handle error on each response stream (response$), for example like that:

// xstream
  .select() // responses$$ stream
  .map(r$ => r$ // catch error for each response$
    .map(success => ({success}))
    .replaceError(error => ({error}))

One of the recommended methods of dealing with successful and failed requests from driver is to use simple helpers that will leave requests with needed result:

// rxjs 
let failure = r$ => r$.skip().catch(of)
let success = r$ => r$.catch(empty)
// xstream
let failure = r$ => r$.drop().replaceError(xs.of)
let success = r$ => r$.replaceError(xs.empty)

Then you can get only successful responses without errors:

// rxjs
// xstream

Accessing response/request pairs

Sometimes you may find yourself in a need to access corresponding response and request pairs, to do this follow such approach:

  const getPairs = r$ => r$.map(res => ({res, req: r$.request}))  
  // get all succesfful response/request pairs
  let goodResReqPair$ = asyncDriver.select()

You can even create something like that:

  // create such success mapper factory
  const success = (mapper = (_ => _)) => 
    r$ => r$.catch(empty).map(res => mapper(res, r$.request))
  // map succesfful response/request pair to something
  let goodReqResMapped$ = asyncDriver.select()
      (response, request) => ...

It is all functional approach. Compose functions as you feel it needs to be.

Lazy drivers and requests

By default all requests are eager (start to perform a side effect just after they get "into" the driver) and response streams (which correspond to particular request) are hot (multicated) and remembered (has short memory) which means that any number of subscriber may listen to response stream and while only one request will be performed all the subscribers will get response value(s), even late subscribers will get the one last value from response stream (you should consider this when dealing with progressive responses).

Lazy request on the other hand starts performing side effect when they get subscriber. Depending of the stream library you use for lazy requests you will get either cold (rxjs, most) or hot (xstream - where all streams are hot) response$ stream.

Note that if you subscribe to lazy and cold stream (response$) in you app, request will start to be performed each time you subscribe to it, this thing is very important to consider (when using rx.js, most.js).

To get lazy driver just pass lazy option set to true, so all requests by default will be lazy:

let readFileDriver = makeAsyncDriver({
  getResponse: request, callback) => {
    fs.readFile(requst.path, request.encoding || 'utf-8', callback)   
  lazy: true

Or you can always override driver setting and make any request lazy (or eager) if required by adding lazy: true (or lazy: false) option to the request inside your app's logic:

  readFile: O.of({
   path: '/path/to/file/to/read',
   lazy: true

Cancellation (and abortion)

Basically, when you want request to be cancelled you should stop listening to corresponding response$ (response stream). By default request in drivers are eager thus start without subscription in you apps logic.

That said request cancellation works only for lazy requests because such requests start on subscription creation and may be cancelled/aborted if subscription is dropped before request is finished to performed.

Often automatic subscription drop is done using flattening the stream of responses to the latest response:

// rxjs
  .flatMapLatest() // or .switch()
  // so here you will get only responses from last request started
  // you won't see responses of the requets that started before 
  // the last one and were not finished before  

If you want to implement driver that on cancellation makes some action - for example aborts not completed requests, you should follow this approach:

import {makeAsyncDriver} from 'cycle-async-driver'
// this example also shows you how to use `getProgressiveResponse`
// say we have some `coolSource` to which we can make requests
// and get some response stream back, 
// and we want translate it to a cycle driver
let myCoolDriver = makeAsyncDriver({
  getProgressiveResponse: (request, observer, onDispose) => {  
    const coolRequest = coolSource.makeRequest(request, (coolStream) => {
      coolStream.on('data', observer.next)
      coolStream.on('error', observer.error)
      coolStream.on('end', observer.completed)
    // third param of `getResponse` or `getProgressiveResponse`
    // is a function `onDispose` that takes
    // a handler which will be called when no listeners 
    // is needing response for this request anymore,   
    // in this case if it happens before the request is completed 
    // you may want to abort it
    onDispose(() => !coolRequest.isCompleted() && coolRequest.abort())   

Note that onDispose handler will be called always when request completed successfully.


npm install
npm run test

For running test in dev mode with watching node-dev should be installed globally (npm i node-dev -g)

Package Sidebar


npm i @cycle-drivers/task

Weekly Downloads






Last publish


  • cycle-drivers