direape

0.2.16 • Public • Published

website github codeclimate travis npm

DireApe - Distributed App Environment

Direape supplies:

  • message passing, locally and across network
  • creation/destruction of worker threads
  • simple nodejs message relay server
  • unit testing library
  • utility library

Read latest documentation on AppEdit.

It is fully self-contained and is the foundation for REUN.

(function() {
  var da; setupModule();

  da.info = {
    name: 'direape',
    keywords: ['message passing', 'unit testing'],
    platforms: ['nodejs', 'webworker', 'browser'],
    github: 'solsort/direape'
  };

Message Passing

pid, nid

direape.pid is id of the current process. direape.nid is the id of the main process on this computer/node.

In worker processes, these are set by the parent thread.

Some of the code below is in progress. Cryptographic ids will be used when it becomes more distributed.

  function initPid() {
    if(!da.pid) {
      if(!self.crypto && isNodeJs()) {
        da._publicKey = Math.random().toString(); // TODO
        da.pid = require('crypto')
          .createHash('sha256')
          .update(da._publicKey)
          .digest('base64');
      } else {
        return da.pid || Promise.resolve()
          .then(() => self.crypto.subtle ||
              (self.crypto.subtle = self.crypto.webkitSubtle))
          .then(() => self.crypto.subtle.generateKey(
                {name: 'ECDSA', namedCurve: 'P-521'},
                true, ['sign', 'verify']))
          .then(key => self.crypto.subtle.exportKey('spki', key.publicKey))
          .then(spki => da._publicKey = spki)
          .then(buf => self.crypto.subtle.digest('SHA-256', buf))
          .then(buf => btoa(da.buf2ascii(buf)))
          .then(base64 => da.pid = da.nid = base64)
          .catch(e => {
            da.pid = da.nid = 'local';
          });
      }
    }
  }

handle(name, fn, opt)

Register a new message handler in the current thread, given the name of the mailbox. The handler can be called across the network if opt is {"public": true}.

The handler function fn, gets called when a message arrives, with the parameters passed in the message.

  da._handlers = da._handlers || new Map();
  da.handle = (name, fn, opt) => {
    if(!fn) {
      da._handlers.delete(name);
    }
    da._handlers.set(name, Object.assign(opt || {}, {fn:fn}));
  };

call(pid, name, args...)

Call a (remote) function, given the process id, and the handler name. Will return a promise of a result (that will time out if no result arrive.

  da.call = function(pid, name) {
    return new Promise((resolve, reject) =>
        send({
          dstPid: pid,
          dstName: name,
          srcPid: da.pid,
          srcName: makeCallbackHandler(resolve, reject),
          data: da.slice(arguments, 2)
        }));
  };

emit(pid, name, args...)

Emit a message to a remote handler, - similar to call, except that it will not send a result back.

  da.emit = function(pid, name) {
    send({
      dstPid: pid,
      dstName: name,
      data: da.slice(arguments, 2)
    });
  };

Implementation details

  da._messageQueue = [];
  var postmanScheduled = false;
  var callTimeout = 10000;

  function makeCallbackHandler(resolve, reject) {
    var name = 'callback' + Math.random().toString().slice(2);
    var timeout = setTimeout(
        () => handler(null, 'call timeout'),
        callTimeout);

    function handler(result, error) {
      clearTimeout(timeout);
      da.handle(name, null);
      if(error) {
        reject(error);
      } else {
        resolve(result);
      }
    }

    da.handle(name, handler, {public: true});
    return name;
  }

  function send(msg) {
    da._messageQueue.push(msg);
    schedulePostman();
  }

  function schedulePostman() {
    if(!postmanScheduled) {
      da.nextTick(postman);
      postmanScheduled = true;
    }
  }

  function postman() {
    postmanScheduled = false;
    var messages = da._messageQueue;
    da._messageQueue = [];
    for(var i = 0; i < messages.length; ++i) {
      processMessage(messages[i]);
    }
  }

  function processMessage(msg) {
    if(msg.dstPid === da.pid) {
      processLocalMessage(msg);
    } else {
      relay(msg);
    }
  }

  function processLocalMessage(msg) {
    var result;
    var handler = da._handlers.get(msg.dstName) || {};

    if(msg.external && ! handler.public) {
      return sendReply(msg, [null, 'no such public function']);
    }

    var fn = handler.fn;
    if(!fn) {
      console.log('no such function:', msg.dstName);
      return sendReply(msg, [null, 'no such function']);
    }

    Promise.resolve(fn.apply(msg, msg.data))
      .then(result => sendReply(msg, [result]))
      .catch(e => sendReply(msg, [null, e]));
  }

  function sendReply(msg, data) {
    if(msg.srcPid !== undefined && msg.srcName !== undefined) {
      send({
        dstPid: msg.srcPid,
        dstName: msg.srcName,
        data: data
      });
    }
  }

  da.test('message passing', () => {
    da.handle('da:square', i => i*i);
    return da.call(da.pid, 'da:square', 9)
      .then(i => da.assertEquals(i, 81));
  });

Workers

Only available in browser main thread.

spawn()

Create a new worker thread.

Returns a promise of the process id of the new thread. Code can then be loaded using reun, which exposes reun:require and reun:eval.

  if(isBrowser()) {
    da._children = da._children || new Map();

    da.spawn = () => new Promise((resolve, reject) => {
      loadWorkerSource().then(workerSource => {

        var childPid = da.pid + Math.random().toString(36).slice(2,12);
        workerSource = `
          self.direape = {
            pid: '${childPid}',
            nid: '${da.pid}'
          };
        ` + workerSource;

        var workerSourceUrl = URL.createObjectURL(
            new Blob([workerSource], {type:'application/javascript'}));

        var child = new self.Worker(workerSourceUrl);

        da._children.set(childPid, child);

        child.onmessage = (o) => {
          child.onmessage = (o) => send(o.data);
          URL.revokeObjectURL(workerSourceUrl);
          resolve(childPid);
        };
      });
    });
  }

isWorker()

Check whether the current process is a worker thread.

  da.isWorker = isWorker;
  function isWorker() {
    return !!self.postMessage && self.postMessage.length === 1;
  }

children()

Get the list of running worker processes.

  if(isBrowser()) {
    da.children = () => da._children.keys();
  }

kill(child-id)

Kill a worker process.

  if(isBrowser()) {
    da.kill = (pid) => {
      da._children.get(pid).terminate();
      da._children.delete(pid);
    };
  }

Implementation details

Send the message to ther processes. Only called if it shouldn't be handled by the process itself;

  function relay(msg) {
    if(isWorker()) {
      self.postMessage(msg);
    } else if(isBrowser()) {
      var child = da._children.get(msg.dstPid);
      if(child) {
        child.postMessage(msg);
      } else {
        relayNetwork(msg);
      }
    } else {
      relayNetwork(msg);
    }
  }

  function initWorker() {
    if(isWorker()) {
      self.onmessage = (o) => send(o.data);
      self.postMessage({});
    }
  }

  function loadWorkerSource() {
    if(!da._workerSourcePromise) {
      var source;
      var reunRequest = da.GET('https://unpkg.com/reun@0.2');
      da._workerSourcePromise = da.GET(isDev() ? './direape.js' : 'https://unpkg.com/direape@0.2')
        .then(src => source = src)
        .then(() => reunRequest)
        .then(reun => source + reun);

    }
    return da._workerSourcePromise;
  }

  function isDev() {
    return isNodeJs() ? process.env.DIREAPE_DEV : self.DIREAPE_DEV;
  }

  if(isBrowser()) {
    da.test('workers', () => {
      var childPid;
      return da.spawn()
        .then(pid => childPid = pid)
        .then(() => da.call(childPid, 'da:status'))
        .then(o => da.assertEquals(o.pid, childPid));
    });
  }

Network

Message passing between servers, - is implemented, and should work, but not used for production yet.

online([boolean/url])

TODO: automatic reconnect

  if(isBrowser()) {

    var websocket;

    da.online = function(url) {
      if(arguments.length === 0) {
        return websocket && websocket.readyState === 1;
      }

      closeWebSocket();

      if(url) {
        if(typeof url !== 'string') {
          url = 'wss://direape.solsort.com';
        }
        return new Promise((resolve, reject) => {
          websocket = new WebSocket(url);
          websocket.onopen = () => {
            websocket.send(JSON.stringify({direapeConnect: da.buf2ascii(da._publicKey)}));
            resolve(true);
          };
          websocket.onerror = (e) => {
            da.emit(da.pid, 'da:socket-error', e);
            reject(e);
          };
          websocket.onmessage = o => {
            var msg = o.data;
            msg = JSON.parse(msg);
            msg.external = msg.external || true;
            send(msg);
          };
        });
      }
    };
  }

Implementation details

Send the message to ther processes. Only called if it shouldn't be handled by the process itself;

  if(isNodeJs()) {
    var wsClients = new Map();
  }

  function closeWebSocket() {
    if(websocket) {
      websocket.onerror = () => null;
      websocket.close();
    }
  }

  function relayNetwork(msg) {
    if(isNodeJs()) {
      var dst = wsClients.get(msg.dstPid.slice(0,44));
      console.log('relay', msg, wsClients.keys(), dst);
      if(dst) {
        console.log('relay send', msg);
        dst.send(JSON.stringify(msg));
      }
    } else {
      if(websocket) {
        websocket.send(JSON.stringify(msg));
      }
    }
  }

Server: nodejs websocket server that relays messages

  if(isNodeJs()) {
    da.startServer = () => {
      var app = require('express')();
      app.use(require('express').static('.'));

      var server = require('http').createServer(app);

      var wss = new (require('ws').Server)({
        perMessageDeflate: false,
        server: server
      });

      wss.on('connection', (ws) => {
        var nid;
        ws.on('message', (msg) => {
          msg = JSON.parse(msg);
          if(msg.direapeConnect) {
            if(nid) {
              wsClients.delete(nid);
            }
            nid = require('crypto')
              .createHash('sha256')
              .update(msg.direapeConnect)
              .digest('base64');
            wsClients.set(nid, ws);
          } else {
            msg.external = nid;
            if(msg.dstPid === 'server') {
              msg.dstPid = da.pid;
            }
            send(msg);
          }
        });
        ws.on('close', () => {
          if(nid) {
            wsClients.delete(nid);
          }
        });
      });

      server.listen(8888, () => console.log('started server on port 8888'));
    };
  }

Built-in Handlers

These are the event handlers. that are exposed

  function initHandlers() {

da:list-clients ()

List clients connected to the NodeJS message relay server.

    if(isNodeJs()) {
      da.handle('da:list-clients', () => Array.from(wsClients.keys()), {public: true});
    }

da:get (url)

Get the content of a URL. This is needed by the module loader: WebWorkers may not be able to load the modules due to the web security model, but they can they are able to request them through the main thread.

    da.handle('da:GET', da.GET);

da:status ()

Get a status with a timestamp. Can be used to ping for latency and checking whether a thread is alive.

    da.handle('da:status', () => ({pid: da.pid, time: Date.now()}), {public: true});

da:children ()

    if(!isBrowser()) {
      da.handle('da:children', da.children);
    }
  }

Utilities

Various utilities needed by DireApe, and made available as other libraries/code are also using them

ready(fn)

Called when direape is loaded, and has a valid pid.

  da._waiting = da._waiting || [];
  da.ready = (fn) => {
    if(da._waiting) {
      da._waiting.push(fn);
    } else {
      da.nextTick(fn);
    }
  };

isNodeJS(), isBrowser()

Determine which environment we are in, see also isWorker, under Workers above.

  da.isNodeJs = isNodeJs;
  function isNodeJs() {
    return !!((self.process || {}).versions || {}).node;
  }

  da.isBrowser = isBrowser;
  function isBrowser() {
    return !!(self.window);
  }

da.log(...) da.trace(...)

Simple logging, used for debug.

  da.log = function(msg, o) {
    console.log.apply(console, arguments);
    return o;
  };
  da.trace = da.log;

ajax GET(url)

We need to export http-get from the main thread, to be able to fetch modules with webworkers.

May as well make a generic ajax function, which is not much extra code, and is needed by other code.

(fetch is not universally supported yet )

  if(self.XMLHttpRequest) {

    da.ajax = (url, opt) =>
      new Promise((resolve, reject) => {
        opt = opt || {};
        if(opt.json) {
          opt.data = JSON.stringify(opt.data);
        }
        opt.method = opt.method || (opt.data ? 'POST' : 'GET');
        var xhr = new XMLHttpRequest();
        xhr.open(opt.method, url);
        xhr.onreadystatechange = function() {
          if(xhr.readyState === 4) {
            if((200 <= xhr.status && xhr.status < 300)
                && typeof xhr.responseText === 'string') {
              resolve(xhr.responseText);
            } else {
              reject(xhr);
            }
          }
        };
        xhr.send(opt.data ? opt.data : undefined);
      });
    da.GET = da.ajax;

  } else {

    da.GET = (url) => new Promise((resolve, reject) => {
      if(url[0] === '.') {
        resolve(require('fs').readFileSync(url));
      } else {
        return require('request')(url, (err, res, body) =>
            err || res.statusCode !== 200 ? reject(err) : resolve(body));
      }
    });
  }
  da.test('GET ok', () => da.GET('https://unpkg.com/direape'));
  da.test('GET fail', () => da.GET('https://unpkg.com/direape/notfound')
      .catch(() => 'error')
      .then(ok => da.assertEquals('error', ok)));

jsonify(obj)

Translate many general JavaScript objects into JSON.stringify-able objects.

Used for preprocesing data that will be passed as messages, if they may contain data of classes that would mess up the serialisation.

  da.jsonify = o =>
    JSON.parse(JSON.stringify([o], (k,v) => jsonReplacer(v)))[0];

  da.test('jsonify', () => {
    var e = new Error('argh');
    e.stack = 'hello';

    da.assertEquals(da.jsonify(e), {
      $_class: 'Error',
      name:'Error',
      message:'argh',
      stack: 'hello'
    });

    da.assertEquals(da.jsonify(function hello() { }), {
      $_class: 'Function',
      name: 'hello'
    });

    da.assertEquals(da.jsonify(null), null);
  });

  function jsonReplacer(o) {
    var jsonifyWhitelist = ['stack', 'name', 'message', 'id', 'class', 'value'];

    if((typeof o !== 'object' && typeof o !== 'function') ||
        o === null || Array.isArray(o) || o.constructor === Object) {
      return o;
    }
    var result, k, i;
    if(typeof o.length === 'number') {
      result = [];
      for(i = 0; i < o.length; ++i) {
        result[i] = o[i];
      }
    }
    result = Object.assign({}, o);
    if(o.constructor && o.constructor.name && result.$_class === undefined) {
      result.$_class = o.constructor.name;
    }
    if(o instanceof ArrayBuffer) {

TODO btoa does not work in arraybuffer, and apply is probably slow. Also handle Actual typed arrays, in if above.

      result.base64 = self.btoa(
          String.fromCharCode.apply(null, new Uint8Array(o)));
    }
    for(i = 0; i < jsonifyWhitelist.length; ++i) {
      k = jsonifyWhitelist[i] ;
      if(o[k] !== undefined) {
        result[k] = o[k];
      }
    }
    return result;
  }

nextTick(fn)

  da.nextTick = nextTick;
  function nextTick(f) {
    setTimeout(f, 0); // TODO this is relatively slow, could implement with faster version.
  }

slice(arr, i, j)

  da.slice = (a, start, end) => {
    return Array.prototype.slice.call(a, start, end);
  };

  da.test('slice', () => {
    da.assertEquals(da.slice([1,2,3]).length, 3);
    da.assertEquals(da.slice([1,2,3], 1)[1], 3);
    da.assertEquals(da.slice([1,2,3], 1 , 2).length, 1);
  });

buf2ascii(buf), ascii2buf(str)

Convert buffers to/from strings using latin1 encoding with one byte per char.

  da.buf2ascii = (buf) =>
    Array.from(new Uint8Array(buf)).map(i => String.fromCharCode(i)).join('');

  da.test('buffer conversion', () => {
    da.assertEquals(da.buf2ascii(Uint8Array.from([104,105]).buffer), 'hi');
  });

equals(a,b)

(deep equal for Object/Array, otherwise .equals(..) or direct comparison)

TODO handle cyclic structures (via weak-map) TODO handle iterables

  da.equals = (a,b) => {
    if(a === b) {
      return true;
    }

    if(typeof a !== 'object' ||
        typeof b !== 'object' ||
        a === null || b === null) {
      return false;
    }

    if(Array.isArray(a)) {
      if(!Array.isArray(b)) {
        return false;
      }
      if(a.length !== b.length) {
        return false;
      }
      for(var i = 0; i < a.length; ++i) {
        if(!da.equals(a[i], b[i])) {
          return false;
        }
      }
      return true;
    }

    if(a.constructor === Object) {
      if(b.constructor !== Object) {
        return false;
      }
      if(!da.equals(
            Object.keys(a).sort(),
            Object.keys(b).sort())) {
        return false;
      }
      for(var key in a) {
        if(!da.equals(a[key] ,b[key])) {
          return false;
        }
      }
      return true;
    }

    if(typeof a.equals === 'function') {
      return a.equals(b);
    }
    return false;
  };

  da.test('equals', () => {
    da.assert(da.equals({a:[1,2],b:3},{b:3,a:[1,2]}));
    da.assert(!da.equals({a:['1',2],b:3},{b:3,a:[1,2]}));
    da.assertEquals({a:[1,2],b:3},{b:3,a:[1,2]});
  });

Testing

  var testTimeout = 5000;

assert(bool)

  da.assert = (ok) => ok || throwAssert({type: 'truthy', val: ok});

assertEquals(val1, val2)

  da.assertEquals = (val1, val2) =>
    da.equals(val1, val2) || throwAssert({type: 'equals', vals: [val1, val2]});

testSuite(name)

Sets the current test suite name

  da.testSuite = testSuite;
  function testSuite(str) {
    da._currentTestSuite = str;
  }

test(name, fn)

Add a test case. For asynchrounous tests, let the function return a promise. Notice that tests may run in parallel.

  da.test = test;
  function test(name, f) {
    if(da._currentTestSuite) {
      name = da._currentTestSuite + ':' + name;
    }
    f.testName = name;
    if(!da._tests) {
      da._tests = [];
    }
    da._tests.push(f);
  }

runTests(testSuites)

Execute the tests given a list of test suites.

  da.runTests = (modules) => {
    var ts = da._tests;
    if(modules) {
      if(!Array.isArray(modules)) {
        modules = [modules];
      }
      ts = ts.filter(t => modules.some(m => t.testName.startsWith(m + ':')));
    }
    return Promise
      .all(ts.map(runTest))
      .then(e => {
        console.log('All tests ok:', ts.map(o => JSON.stringify(o.testName)).join(', '));
      });
  };

Implementation details

  function runTest(t) {
    var err, p;

    try {
      p = Promise.resolve(t());
    } catch(e) {
      p = Promise.reject(e);
    }

    var timeout = new Promise((_, reject) =>
        setTimeout(() => reject(new Error('Timeout')), testTimeout));
    p = Promise.race([p, timeout]);

    p = p.catch(e => err = e);

    p = p.then(() => {
      if(err) {
        console.log('Test error in "' +
            (t.testName || '') +
            ': ' + err.message);
        if(err.assert) {
          try {
            console.log(JSON.stringify(err.assert));
          } catch(e) {
            console.log(err.assert);
          }
        }

        if(err.stack) {
          console.log(err.stack);
        }

        throw err;
      } else {
 console.log('Test ok', t.testName || '');
      }
    });
    return p;
  }

  /*
     test('must error 1', () => da.assert(false));
     test('must error 2', () => new Promise(() => da.assert(false)));
     test('must error 3', () => new Promise((reject, resolve) => {true;}));
     */

To get the call stack correct, to be able to report assert position, we throw an Error (which includes the stack on many browsers), and enrich it with more information.

  function throwAssert(o) {
    var err = new Error('AssertError');
    err.assert = o;
    throw err;
  }

  da.test('assert',()=>{
    try {
      da.assertEquals(1,2);
    } catch(e) {
      da.assert(e.message === 'AssertError');
      da.assert(typeof e.stack === 'string');
    }
  });

Module Setup / Main

  function setupModule() {

Shims

    if(typeof self === 'undefined') {
      global.self = global;
    }

    if(!self.URL) {
      self.URL = self.webkitURL;
    }

Make sure are on https in browser, otherwise crypto etc. wont work.

    if(isBrowser() &&
        self.location.protocol === 'http:' &&
        self.location.hostname !== 'localhost' &&
        !self.location.hostname.startsWith('192.168.')) {
      self.location.href = self.location.href.replace(/http/, 'https');
    }

Setup / export module

    da = self.direape || {};

    if(typeof module === 'undefined') {
      self.direape = da;
    } else {
      module.exports = da;
    }

    da.test = test;

Define name of testsuite

    testSuite('direape');
  }

Initialisation, of the different parts of direape

  Promise
    .resolve(initPid())
    .then(initHandlers)
    .then(initWorker)
    .then(() => {
      for(var i = 0; i < da._waiting.length; ++i) {
        da._waiting[i]();
      }
      da._waiting = false;
    });

Main entry

  testSuite('');
  da.ready(() => {
    if(self.DIREAPE_RUN_TESTS) {
      da.runTests();
    }

    if(isNodeJs() && require.main === module) {
      if(process.argv.indexOf('test') !== -1) {
        da.runTests('direape')
          .then(o => {
            if(process.argv.indexOf('server') !== -1) {
              da.startServer();
            } else {
              process.exit(0);
            }
          }).catch(e => process.exit(-1));
      }
    }
  });
})();

License

This software is copyrighted solsort.com ApS, and available under GPLv3, as well as proprietary license upon request.

Versions older than 10 years also fall into the public domain.

Readme

Keywords

none

Package Sidebar

Install

npm i direape

Weekly Downloads

4

Version

0.2.16

License

GPL-3.0

Last publish

Collaborators

  • solsort