Wondering what’s next for npm?Check out our public roadmap! »

    rx.wamp
    DefinitelyTyped icon, indicating that this package has TypeScript declarations provided by the separate @types/rx.wamp package

    0.5.0 • Public • Published

    rx.wamp

    A Reactive wrapper library for the autobahn wamp v1/v2 library in the browser/node

    If you have been using below version 0.2.0 please see below for important API changes!

    Installation

    Regular browser

     
    <script type="application/javascript" src="javascripts/lib/autobahn.js"></script>
    <script type="application/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.3.22/rx.lite.js"></script>
    <script type="application/javascript" src="javascripts/rx.wamp.js" ></script>
     

    RequireJS

     
    require(['rx.wamp'], function(Rx) {
     
      //Do stuff
     
    });
     

    NodeJS

     
    var rxwamp = require('rx.wamp');
     
     
    //Do stuff with the autobahn
    //This works for both versions, though realm is only required in version 2
    Rx.Observable
      .fromConnection({url: 'ws://localhost:9000', realm: 'realm1'});
     

    Connection

     
    function newSession(session) {
      console.log("A new session was created");
    }
     
    var connectionSubscription = Rx.Observable.fromConnection("ws://localhost:9000")
        .subscribe(newSession);
        
    //Close our current connection and don't retry
    connectionSubscription.dispose();
     
     

    Subscribing to topics

     
    function validateArgs(value) {
      return value.args && value.args.length > 1;
    }
     
    function getResultValue(value) {
      return value.args[1];
    }
     
    //You may optionally pass in an observer to listen for the subscription completing
    var openObserver = Rx.Observer.create();
     
    var topic = Rx.Observable.subscribeAsObservable(session, "wamp.my.foo", options, openObserver);
     
    //Do all the normal reactive operations on it
    var topicSubscription = topic
      .filter(validateArgs)
      .map(getResultValue)
      .subscribe(console.log);
        
    //Unsubscribe from topic, you will no longer receive updates from this topic
    topicSubscription.dispose();
     
    New in version 0.3!

    You can now pass your connection observable directly into your subscription so that it will persist across sessions

     
    Rx.Observable.subscribeAsObservable(Rx.Observable.fromConnection("ws://myconnectionurl:9090"), "wamp.my.foo", onResult);
     

    New in version 0.5!

    You can now use an even shorter-hand for subscription. These will automatically persist across sessions if you use the fromConnection() overload.

     
    var connection = Rx.Observable.fromConnection("ws://myconnectionurl:9090");
     
    //You can subscribe to as many items as you want
    var subscriber = 
    Rx.Observable.subscriber(connection)
      .to("wamp.my.foo", {}, fooObserver)
      .to("wamp.my.other.foo", function(message) {}, function(ex) {}, function(){});
      
    //You may cancel all of the items with one command as well
    subscriber.dispose();
      
      
      
     

    Publishing to topic

     
    //Version 2
    var published = Rx.Observable.publishAsObservable(session, "wamp.my.foo", [42], {key : "value"}, {});
     
    //Version 1 - Overloads
    Rx.Observable.publishAsObservable(session, "wamp.my.foo", { args : [42], kwargs : { key : "value" } }, true);
    Rx.Observable.publishAsObservable(session, "wamp.my.foo", { args : [42], kwargs : { key : "value" } }, [12345678]);
    Rx.Observable.publishAsObservable(session, "wamp.my.foo", { args : [42], kwargs : { key : "value" } }, [12345678], [87654321]);
     

    Or use them together

     
    //Surfaces a subject which can do both publication and subscription
    var topic = Rx.Observable.fromPubSubPattern(session, "wamp.pubsub.topic", {});
     
    //When the topic successfully subscribes it is surfaced through the 'opened' property
    topic.opened.subscribe(function(){
      console.log("subscribed to topic");
    });
     
    //Errors in publishing are surfaced through the 'errors' property
    topic.errors.subscribe(function(err){
      console.log("There was an error publishing the message");
    });
     
    //subscribe to the observer
    topic.subscribe(observer);
     
    //publish to the observer
    Rx.Observable.generateWithRelativeTime(0, 
      function(x) {return x < 42; },
      function(x) {return x + 1; },
      function(x) {return {args : [x]}; },
      function(x) {return 15; })
      .subscribe(topic);
      
     

    Registering methods

    Note that this will only work in version 2

     
    function endpoint(args, kwargs, details) {
      if (args === undefined || args.length < 1)
        throw new autobahn.Error("No values to sum!");
      else if (args.length > 2) {
        throw new autobahn.Error("Too many values!");
      } else {
        return args[0] + args[1];
      }
    }
     
    function onError(e) {
      //This will get called for all errors.
    }
     
    var registration = 
      Rx.Observable
        .registerAsObservable(session, "wamp.my.add", endpoint, {})
        //This will bubble up all errors that occur either
        //during registration or unregistration.
        .subscribeOnError(onError);
        
     
    //Unregister
    registration.dispose();
     
    New in version 0.3!

    You can now pass your connection observable directly into your registration so that it will persist across sessions

     
    var connection = Rx.Observable.fromConnection({url : myUrl, realm : 'realm1'});
     
    Rx.Observable.registerAsObservable(connection, "wamp.my.add", endpoint, {});
     

    Calling methods

    We can call methods, like the one in the example above, as well.

     
    var caller = session.callAsObservable("wamp.my.add", {});
     
    //Version 2
    caller([2, 3], {})
        .subscribe(function(value){
          // => 5
          console.log("Result was %s", value.args[0]);
        });
     
    //Resubscribing will yield the cached result
    addResult.subscribe(function(value) {
          console.log("Result was %s", value.args[0]);
    });
     
    //Version 1
    caller(2, 3)
      .subscribe(function(value) {});
     

    Authentication

    Currently only available in V1

     
    //In this case the *this* of the onchallenge function will be the session.
    Rx.Observable.authreqAsObservable(session, 
    //Raised when the server challenges the authentication
    function onchallenge(challenge){
      var signature = this.authsign(challenge, "");
      return this.auth(signature);
    }, 
    "blahsomeauthenticationkeyblah", 
    {});
     

    Advanced

    Weather Station Monitor

     
    //listen for sensor readings
    var sensorReadings = Rx.Observable.subscribeAsObservable(session, "weather.sensor");
     
    //A remote service for analyzing our readings, it might be aggregating across several different sources
    var analyzer = Rx.Observable.callAsObservable(session, "weather.forecast.compute");
     
    //Home control settings
    var desiredTemperature = Rx.Observable.subscribeAsObservable(session, "temperature.indoors.desired");
     
    var dailyForecast = 
    sensorReadings
      .map(function(rawValue){
        //Some compatibility so we can transparently use between versions
        return rawValue.kwargs || rawValue.event;
      })
      .throttleFirst(1000) // At most once every second
      .bufferWithTime(1000 * 60 * 60 * 24) //Milliseconds in a day
      .tap(function(readings) {
        //Send these off to our visualizer somewhere on the network
        Rx.Observable.publishAsObservable(session, "weather.visualizer.daily", readings);
      })
      .flatMap(function(readings) {
        //This returns an observable which we will flatMap back into our stream
        return analyzer(readings);
      })
      .publish().refCount();
     
    //Warn of inclement weather coming in  
    dailyForecast
      //only get warnings
      .filter(function(weather) {
        return weather.warnings.length > 0;
      })
      .map(function(weather) {
        //remap only the first warning, don't know why, just cause
        var warning = weather.warnings[0];
        return {type : warning.type, severity : warning.severity, message : "GET TO DA CHOPPA!!"};
      })
      //Publish it to our klaxon service to warn everyone on the block
      .subscribe(Rx.Observable.publishAsObservable.bind(null, session, "weather.warnings.klaxon"));
      
    //Notify the climate control to turn off
    dailyForecast
      .map(function(weather) {
        return weather.temperature.average;
      })
      .combineLatest(desiredTemperature, function(actual, desired) {
        return Math.abs(desired - actual);
      })
      .map(function(difference) {
        return {state : difference > 4};
      })
      .subscribe(Rx.Observable.publishAsObservable.bind(null, session, "indoor.climatecontrol.active"));
      
     
     
    //Create a pipeline of distributed computation
    var adder = session.caller("wamp.my.add");
    var multiplier = session.caller("wamp.my.multiply");
     
    //Somewhat contrived but you get the idea
    var pipeline = 
      adder([2, 3])
        .zip(adder([3, 4]), function(value1, value2) { 
          return [value1.args[0], value2.args[0]];
        })
        .flatMap(function(value) { 
          return multiplier(value[0], value[1]); 
        });
      
      pipeline.subscribe(function(value){
        // =>  (2 + 3) * (3 + 4) = 35
        console.log("Result was %d", value.args[0]);
      })
     
     

    TODO

    • [Major] Implement cross-platform compatibility (currently only works in node)
    • [Major] Bug fixing
    • [Major] Improve API semantics and readability
    • [Major] Push to cdn platforms (npm/bower/cdnjs or microjs).
    • [Minor] Add v1 backward compatibility

    Install

    npm i rx.wamp

    DownloadsWeekly Downloads

    24

    Version

    0.5.0

    License

    MIT

    Last publish

    Collaborators

    • avatar