Net Possibility Multiplier

    @thi.ng/csp
    TypeScript icon, indicating that this package has built-in type declarations

    2.1.30 • Public • Published

    csp

    npm version npm downloads Twitter Follow

    This project is part of the @thi.ng/umbrella monorepo.

    About

    ES6 promise based CSP primitives & operations.

    • Channel with/without buffering and/or transducers
      • optional channel IDs
      • choice of buffer behaviors (fixed, sliding, dropping)
      • channel selection
      • channel merging (many-to-one, serial or parallel)
      • channel piping (w/ transducers)
      • timeouts / sleeping / throttling / delaying
      • prepopulated channel ctors (iterators, ranges, promise, constants etc.)
    • Mult for channel multiplexing (one-to-many splitting)
      • individual transducers per tap
      • dynamic add/removal of taps
    • PubSub for topic subscriptions
      • each topic implemented as Mult
      • wildcard topic for processing fallthrough messages

    Status

    DEPRECATED - no further development planned

    Search or submit any issues for this package

    This package might possibly become deprecated soon. See @thi.ng/rstream for a similar, but alternative (and actively maintained) approach.

    Related packages

    • @thi.ng/rstream - Reactive streams & subscription primitives for constructing dataflow graphs / pipelines

    Installation

    yarn add @thi.ng/csp

    ES module import:

    <script type="module" src="https://cdn.skypack.dev/@thi.ng/csp"></script>

    Skypack documentation

    For Node.js REPL:

    # with flag only for < v16
    node --experimental-repl-await
    
    > const csp = await import("@thi.ng/csp");
    

    Package sizes (brotli'd, pre-treeshake): ESM: 2.41 KB

    Dependencies

    API

    Generated API docs

    File loading & word frequency analysis

    import { Channel } from "@thi.ng/csp";
    import * as tx from "@thi.ng/transducers";
    
    // compose transducer to split source file into words
    // and filter out short strings
    const proc: tx.Transducer<string, string> = tx.comp(
        tx.mapcat((src: string) => src.toLowerCase().split(/[^\w]+/g)),
        tx.filter((w: string) => w.length > 1)
    );
    
    // define a channel which receives file paths
    // and resolves them with their contents
    const paths = new Channel<any>(
        tx.map((path: string) =>
            new Promise<string>(
                resolve => fs.readFile(path, (_, data) => resolve(data.toString()))
            )
        )
    );
    
    // define multiplexed output channel
    // items in this channel will have this form: `[word, count]`
    const results = new Mult("results");
    
    // tap result channel and sum word counts
    const counter = results
        .tap(tx.map(x => x[1]))
        .reduce(tx.add());
    
    // 2nd output channel with streaming sort transducer
    // (using a sliding window size of 500 items) and dropping
    // words with < 20 occurrences
    const sorted = results.tap(
        tx.comp(
            tx.streamSort(500, x => x[1]),
            tx.dropWhile(x => x[1] < 20)
        )
    );
    
    // define workflow:
    // pipe source files into a new channel and
    // reduce this channel using `frequencies` reducer
    // finally stream the result map (word frequencies)
    // into the `sorted` channel
    // (`freqs` is a JS Map and is iterable)
    paths.pipe(proc)
        .reduce(tx.frequencies())
        .then(freqs => results.channel().into(freqs));
    
    // start tracing sorted outputs and
    // wait for all to finish
    Promise
        .all([sorted.consume(), counter])
        .then(([_, num]) => console.log("total words:", num));
    
    // no real work has been executed thus far (only scheduled via promises)
    // now kick off entire process by writing file paths into the 1st channel
    paths.into(["src/channel.ts", "src/mult.ts", "src/pubsub.ts"]);
    
    // results-tap1 : [ 'let', 20 ]
    // results-tap1 : [ 'topic', 20 ]
    // results-tap1 : [ 'chan', 20 ]
    // results-tap1 : [ 'number', 22 ]
    // results-tap1 : [ 'buf', 23 ]
    // results-tap1 : [ 'length', 23 ]
    // results-tap1 : [ 'tx', 25 ]
    // results-tap1 : [ 'state', 25 ]
    // results-tap1 : [ 'from', 27 ]
    // results-tap1 : [ 'close', 28 ]
    // results-tap1 : [ 'new', 33 ]
    // results-tap1 : [ 'args', 34 ]
    // results-tap1 : [ 'id', 36 ]
    // results-tap1 : [ 'any', 36 ]
    // results-tap1 : [ 'src', 38 ]
    // results-tap1 : [ 'if', 40 ]
    // results-tap1 : [ 'return', 47 ]
    // results-tap1 : [ 'channel', 57 ]
    // results-tap1 : [ 'this', 120 ]
    // results-tap1 done
    // total words: 1607

    Channel merging

    Channel.merge([
        Channel.range(0, 3),
        Channel.range(10, 15),
        Channel.range(100, 110)
    ]).reduce(tx.push()).then(console.log);
    
    // [ 0, 100, 101, 102, 103, 1, 2, 104, 105, 10, 11, 12, 13, 106, 14, 107, 108, 109 ]
    
    // emit tuples of values read from all inputs
    // preserves ordering of all inputs, but
    // throughput controlled by slowest input
    // by default stops & closes when any of the inputs closes
    Channel.mergeTuples([
        Channel.from([1, 2, 3]),
        Channel.from([10, 20, 30, 40]),
        Channel.from([100, 200, 300, 400, 500])
    ], null, false).consume();
    
    // chan-3 : [ 1, 10, 100 ]
    // chan-3 : [ 2, 20, 200 ]
    // chan-3 : [ 3, 30, 300 ]
    // chan-3 done
    
    // same as above, however continues until all inputs are closed
    Channel.mergeTuples([
        Channel.from([1, 2, 3]),
        Channel.from([10, 20, 30, 40]),
        Channel.from([100, 200, 300, 400, 500])
    ], null, false).consume();
    
    // chan-3 : [ 1, 10, 100 ]
    // chan-3 : [ 2, 20, 200 ]
    // chan-3 : [ 3, 30, 300 ]
    // chan-3 : [ undefined, 40, 400 ]
    // chan-3 : [ undefined, undefined, 500 ]
    // chan-3 done

    PubSub

    // define a channel publisher with transducer and topic function applied to each item
    // the input channel receives names and transforms them into indexable objects
    const pub = new PubSub(
        new Channel<any>("users", tx.map((x: string) => ({ type: x.charAt(0), val: x }))),
        (x) => x.type
    );
    
    // create subscriptions (channel + debug consumer)
    // under the hood each topic is a Mult (multiplexed channel)
    // sub channels are automatically named:
    // `<src-id>-<topic>-tap<tapid>` (see below)
    for (let i of "abc") {
        pub.sub(i).consume();
    }
    
    // start processing, then close everything down
    // (pubsubs & mults are closed recursively once the input channel is closed)
    pub.channel().into(["alice", "bert", "bella", "charlie", "arthur"]);
    
    // users-a-tap0 : { type: 'a', val: 'alice' }
    // users-b-tap0 : { type: 'b', val: 'bert' }
    // users-b-tap0 : { type: 'b', val: 'bella' }
    // users-c-tap0 : { type: 'c', val: 'charlie' }
    // users-a-tap0 : { type: 'a', val: 'arthur' }
    // users-b-tap0 done
    // users-c-tap0 done
    // users-a-tap0 done

    Authors

    Karsten Schmidt

    If this project contributes to an academic publication, please cite it as:

    @misc{thing-csp,
      title = "@thi.ng/csp",
      author = "Karsten Schmidt",
      note = "https://thi.ng/csp",
      year = 2016
    }

    License

    © 2016 - 2022 Karsten Schmidt // Apache Software License 2.0

    Install

    npm i @thi.ng/csp

    DownloadsWeekly Downloads

    63

    Version

    2.1.30

    License

    Apache-2.0

    Unpacked Size

    63.5 kB

    Total Files

    16

    Last publish

    Collaborators

    • thi.ng