ems

Shared Memory Parallelism with Transactional Memory and Extended Memory Semantics

OSX | Linux | Node 4.1, 4.0, 0.12, 0.11, 0.10, iojs:

Extended Memory Semantics (EMS)

EMS makes possible shared memory parallelism in Node.js (and soon Python).

Extended Memory Semantics (EMS) is a unified programming and execution model that addresses several challenges of parallel programming:

  • Allows any number or kind of processes to share objects
  • Manages synchronization and object coherency
  • Implements persistence to NVM and secondary storage
  • Provides dynamic load-balancing between processes
  • May substitute or complement other forms of parallelism

A modern multicore server has 16-32 cores and over 200GB of memory, equivalent to an entire rack of systems from a few years ago. As a consequence, jobs formerly requiring a Map-Reduce cluster can now be performed entirely in shared memory on a single server without using distributed programming.

EMS extends application capabilities to include transactional memory and other fine-grained synchronization capabilities.

EMS implements several different parallel execution models:
  • Fork-Join Multiprocess execution begins with a single process that creates new processes when needed, those processes then wait for each other to complete.
  • Bulk Synchronous Parallel execution begins with each process starting the program at the main entry point and executing all the statements
  • User Defined parallelism may include ad-hoc processes and mixed-language applications

EMS operations may performed using any JSON data type, read-modify-write operations may use any combination of JSON data types, producing identical results to like operations on ordinary data.

All basic and atomic read-modify-write operations are available in all concurrency modes, however collectives are not currently available in user defined modes.

  • Basic Operations: Read, write, readers-writer lock, read full/empty, write empty/full
  • Primitives: Stacks, queues, transactions
  • Atomic Read-Modify-Write: Fetch-and-Add, Compare and Swap
  • Collective Operations: All basic OpenMP collective operations are implemented in EMS: dynamic, block, guided, and static loop scheduling, barriers, master and single execution regions

Map-Reduce is often demonstrated using word counting because each document can be processed in parallel, and the results of each document's dictionary reduced into a single dictionary. This EMS implementation also iterates over documents in parallel, but it maintains a single shared dictionary across processes, atomically incrementing the count of each word found. The final word counts are sorted and the most frequently appearing words are printed with their counts.

var file_list = fs.readdirSync(doc_path);
var splitPattern = new RegExp(/[ \n,\.\\/_\-\<\>:\;\!\@\#\$\%\&\*\(\)=\[\]|\"\'\{\}\?\—]/);
// Iterate over all the files in parallel 
ems.parForEach(0, file_list.length, function (fileNum) {
    var text = fs.readFileSync(doc_path + file_list[fileNum], 'utf8', "r");
    var words = text.replace(/[\n\r]/g, ' ').toLowerCase().split(splitPattern);
    //  Sequentially iterate over all the words in one document 
    words.forEach(function (word) {
        wordCounts.faa(word, 1); //  Atomically increment the count of times this word was seen 
    });
});

Parallel sorting of the word count is implemented as a multi-step process:

  1. The bag of words is processed by all the processess, each process building an ordered list of the most common words it encounters
  2. The partial lists from all the processes are concatenated into a single list
  3. The list of the most common words seen is sorted and truncated
var biggest_counts = new Array(local_sort_len).fill({"key": 0, "count": 0});
ems.parForEach(0, maxNKeys, function (keyN) {
    var key = wordCounts.index2key(keyN);
    if (key) {
        //  Perform an insertion sort of the new key into the biggest_counts 
        //  list, deleting the last (smallest) element to preserve length. 
        var keyCount = wordCounts.read(key);
        var idx = local_sort_len - 1;
        while (idx >= 0  &&  biggest_counts[idx].count < keyCount) { idx -= 1; }
        var newBiggest = {"key": key, "count": keyCount};
        if (idx < 0) {
            biggest_counts = [newBiggest].concat(biggest_counts.slice(0, biggest_counts.length - 1));
        } else if (idx < local_sort_len) {
            var left = biggest_counts.slice(0, idx + 1);
            var right = biggest_counts.slice(idx + 1);
            biggest_counts = left.concat([newBiggest].concat(right)).slice(0, -1);
        }
    }
});
//  Concatenate all the partial (one per process) lists into one list 
stats.writeXF('most_frequent', []);  // Initialize the list 
ems.barrier();  // Wait for all procs to have finished initialization 
stats.writeEF('most_frequent', stats.readFE('most_frequent').concat(biggest_counts));
ems.barrier();  // Wait for all procs to have finished merge 
ems.master(function() { //  Sort & print the list of words, only one process is needed 
    biggest_counts = stats.readFF('most_frequent');
    biggest_counts.sort(function (ab) { return b.count - a.count; });
    //  Print only the first "local_sort_len" items -- assume the worst case 
    //  of all the largest counts are discovered by a single process 
    console.log("Most frequently appearing terms:");
    for (var index = 0;  index < local_sort_len;  index += 1) {
        console.log(index + '' + biggest_counts[index].key + "   " + biggest_counts[index].count);
    }
});

This section reports the results of running the Word Count example on documents from Project Gutenberg. 2,981,712,952 words in several languages were parsed, totaling 12,664,852,220 bytes of text.

The performance of this program was measured using an Amazon EC2 instance:
c4.8xlarge (132 ECUs, 36 vCPUs, 2.9 GHz, Intel Xeon E5-2666v3, 60 GiB memory The leveling of scaling aroung 16 cores despite the presence of ample work may be related to the use of non-dedicated hardware: Half of the 36 vCPUs are presumably HyperThreads or otherwise shared resoruce. AWS instances are also bandwidth limited to EBS storage, where our Gutenberg corpus is stored.

A benchmark similar to STREAMS gives us the maximum speed EMS double precision floating point operations can be performed on a c4.8xlarge (132 ECUs, 36 vCPUs, 2.9 GHz, Intel Xeon E5-2666v3, 60 GiB memory.

Using EMS Transactional Memory to atomically update two account balances while simultaneously preventing updates to the user's customer records.

var ems = require('ems')(process.argv[2])        // Initialize EMS 
var customers = ems.new(...)                     // Allocate EMS memory for customer records 
var accounts  = ems.new(...)                     // Allocate accounts 
...
// Start a transaction involving Bob and Sue 
var transaction= ems.tmStart( [ [customers, 'Bob Smith', true],  // Read-only:  Bob's customer record 
                                [customers, 'Sue Jones', true],  // Read-only:  Sue's customer record 
                                [accounts, 'Bob Smith'],         // Read-Write: Bob's account balance 
                                [accounts, 'Sue Jones'] ] );     // Read-Write: Sue's account balance 
// Transfer the payment and update the balances 
var bobsBalance = accounts.read('Bob Smith');               // Read the balance of Bob's account 
accounts.write('Bob Smith', bobsBalance - paymentAmount);   // Deduct the payment and write the new balance back 
var suesBalance = accounts.read('Sue Jones');               // Read the balance of Sue's account 
accounts.write('Sue Jones', suesBalance + paymentAmount);   // Add the payment to Sue's account 
// Commit the transaction or abort it if NSF 
if(balance > paymentAmount) {                               // Test for overdraft 
    ems.tmEnd(transaction, true);                           // Sufficient funds, commit transaction 
} else {
    ems.tmEnd(transaction, false);                          // Not Sufficient Funds, roll back transaction 
}

The micro-benchmarked raw transactional performance and performance in the context of a workload are measured separately. The experiments were run using an Amazon EC2 instance:
c4.8xlarge (132 ECUs, 36 vCPUs, 2.9 GHz, Intel Xeon E5-2666v3, 60 GiB memory

Six EMS arrays are created, each holding 1,000,000 numbers. During the benchmark, 1,000,000 transactions are performed, each transaction involves 1-5 randomly selected elements of randomly selected EMS arrays. The transaction reads all the elements and performs a read-modify-write operation involving at least 80% of the elements. After all the transactions are complete, the array elements are checked to confirm all the operations have occurred.

The parallel process scheduling model used is block dynamic (the default), where each process is responsible for successively smaller blocks of iterations. The execution model is bulk synchronous parallel, each processes enters the program at the same main entry point and executes all the statements in the program. forEach loops have their normal semantics of performing all iterations, parForEach loops are distributed across threads, each process executing only a portion of the total iteration space.


Immediate Transactions: Each process generates a transaction on integer data then immediately performs it.

Transactions from a Queue: One of the processes generates the individual transactions and appends them to a work queue the other threads get work from. Note: As the number of processes increases, the process generating the transactions and appending them to the work queue is starved out by processes performing transactions, naturally maximizing the data access rate.

Immediate Transactions on Strings: Each process generates a transaction appending to a string, and then immediately performs the transaction.
Measurements
Elem. Ref'd: Total number of elements read and/or written
Table Updates: Number of different EMS arrays (tables) written to
Trans. Performed: Number of transactions performed across all EMS arrays (tables)
Trans. Enqueued: Rate transactions are added to the work queue (only 1 generator thread in these experiments)

EMS internally stores tags that are used for synchronization of user data, allowing synchronization to happen independently of the number or kind of processes accessing the data. The tags can be thought of as being in one of three states, Empty, Full, or Read-Only, and the EMS intrinsic functions enforce atomic access through automatic state transitions.

The EMS array may be indexed directly using an integer, or using a key-index mapping from any primitive type. When a map is used, the key and data itself are updated atomically.



EMS memory is an array of JSON primitive values (Number, Boolean, String, or Undefined) accessed using atomic operators and/or transactional memory. Safe parallel access is managed by passing through multiple gates: First mapping a key to an index, then accessing user data protected by EMS tags, and completing the whole operation atomically.


EMS Data Tag Transitions & Atomic operations: F=Full, E=Empty, X=Don't Care, RW=Readers-Writer lock (# of current readers) CAS=Compare-and-Swap, FAA=Fetch-and-Add

For a more complete description of the principles of operation, visit the EMS web site.

Complete API reference


Because all systems are already multicore, parallel programs require no additional equipment, system permissions, or application services, making it easy to get started. The reduced complexity of lightweight threads communicating through shared memory is reflected in a rapid code-debug cycle for ad-hoc application development.

EMS is available as a NPM Package. EMS itself has no external dependencies, but does require compiling native C++ functions using node-gyp, which is also available as a NPM (sudo npm install -g node-gyp).

The native C parts of EMS depend on other NPM packages to compile and load. Specifically, the Foreign Function Interface (ffi), C-to-V8 symbol renaming (bindings), and the native addon abstraction layer (nan) are also required to compile EMS.

npm install ems

Download the source code, then compile the native code:

git clone https://github.com/SyntheticSemantics/ems.git
cd ems
npm install

To use this EMS development build to run the examples or tests, set up a global npm link to the current build:

sudo npm link ../ems

On a Mac and most Linux distributions EMS will "just work", but some Linux distributions restrict access to shared memory. The quick workaround is to run jobs as root, a long-term solution will vary with Linux distribution.

Run the work queue driven transaction processing example on 8 processes:

npm run example

Or manually via:

cd Examples
node concurrent_Q_and_TM.js 8

Running all the tests with 8 processes:

npm run test      # Alternatively: npm test 
cd Tests
rm -f EMSthreadStub.js   # Do not run the machine generated script used by EMS 
for test in `ls *js`; do node $test 8; done

As of 2016-01-24, Mac/Darwin and Linux are supported. A windows port pull request is welcomed!

EMS 1.0 uses Nan for long-term Node.js support, we continue to develop on OSX and Linux via Vagrant. Version 1.x is feature-frozen, but support continues as we back-patch fixes and featues on our way to version 2.0.

EMS 2.0 is in the planning phase, the new API will more tightly integrate with ES6, Python, and other dynamically typed languages languages, making atomic operations on persistent memory more transparent. These new language features also simplify integration with legacy applications. The types of persistent and Non-Volatile Memory (NVM) EMS was designed for, such as pmem.io, will be introduced into servers in the next few years, we hope to leverage their work and future-proof EMS against architectural shifts.

BSD, other commercial and open source licenses are available.

Visit the EMS web site

Download the NPM

Get the source at GitHub

Jace A Mogill specializes in FPGA/Software Co-Design, recently embedding a FPGA emulation of an ASIC into Python and also designing an hardware accelerator for Python, Javascript, and other languages. He has over 20 years experience optimizing software for distributed, multi-core, and hybrid computer architectures. He regularly responds to mogill@synsem.com.