Extended Memory Semantics
Overview of EMS   |   API Documentation   |   Node.js NPM   |   Download at GitHub

Extended Memory Semantics

EMS internally stores tags that are used for synchronization of user data, allowing synchronization to happen independently of the number or kind of processes accessing the data. The EMS primitives enforce atomic access using automatic state transitions, and higher level intrinsics like stacks, queues, and transactional memory are built on the EMS primitives.


EMS memory is an array of JSON values accessed using atomic operators and/or transactional memory. Safe parallel access is managed by passing through multiple gates: First mapping a key to an index, then accessing user data protected by EMS tags, and completing the whole operation atomically.

EMS Data Tag Transitions & Atomic operations: F=Full, E=Empty, X=Don't Care, RW=Readers-Writer lock (# of current readers) CAS=Compare-and-Swap, FAA=Fetch-and-Add

EMS Class & Array Methods

Operations which inspect or affect the global EMS state are performed using class methods of the EMS object returned by the require statement. Operations that modify the data and tags are performed using methods belonging to EMS array to be operated on.

Module Require
REQUIRE require('ems') ( nThreads [, threadAffinity [, parallelType [, contextName ] ] ] )
SYNOPSIS Initialize the EMS module, starting all the other threads. Thread identity and processor affinity is assigned when the thread is created.

ARGUMENTS nThreads <Number> Total number of threads the job should use.
threadAffinity <Boolean> (Optional, Default = false, Affects only Linux.) Set the scheduling affinity of each thread to it's own core, assigning over-subscribed threads in a round-robin fashion.
parallelType <String> (Optional, Default=bsp) One of bsp, fj, or user. Execution model: bsp will use EMS' built-in Bulk Synchronous Parallel execution, fj uses EMS' built-in Fork-join execution user creates no parallelism
contextName <String> (Optional, Default=anonymous) Unique name of parallel context being initialized, required to distinguish between multiple EMS parallel programs running simultaneously on the same system.

RETURNS ems = { nThreads : Number, // Number of threads executing myID : Number // This thread's ID 0..nThreads-1 }
EXAMPLES ems = require('ems')(process.argv[2]) Use the first command line argument as the number of nodes:
node foo.js 4 executes using 4 threads.
ems = require('ems')() Run on one node
ems = require('ems')(process.argv[2], false, true) Use first command line argument as number of nodes, do not set affinity of the threads to a specific CPU, execute using fork-join parallelism.
Create a new EMS Array
CLASS METHOD ems.new( [ nElements [, heapSize [, fileName] ] ] )
  ems.new( emsArrayDescriptor )
SYNOPSIS Attach to an existing or create a new EMS array. Creation of new (do not use existing) EMS memory regions implies a barrier, whereas using an existing EMS file will block until the file exists.

ARGUMENTS nElements <Number> (Optional, Default is 1) Maximum number of elements in the EMS array or map.
<Array> An array of dimensions of a multi-dimensional array. A 100×30×50 cube is described by [100, 30, 50].
heapSize <Number> (Optional, Default is 0) Maximum number of bytes reserved for strings, arrays, maps, and object elements in this array. The actual amount of memory allocated for use is rounded up to the nearest power of 2 (until a better memory allocator is implemented). Additional memory is allocated for bookkeeping. Memory block size is defined in ems_alloc.h: EMS_MEM_BLOCKSZ but should be configurable at create time.
fileName <String> (Optional, Default is anonymous) Fully qualified file name of the file to use as a persistent backing store for the EMS array, tags, and bookkeeping information.
emsArrayDescriptor <Object> (Alternative to two scalar argument) A complete EMS Array descriptor may be passed as the only argument instead of scalar arguments. emsArrayDescriptor = { dimensions : 100, // Required: Max # elements in EMS array // Integer for 1-D, or array of dims [x, y, z] heapSize : 100000, // Optional, default=0: Space, in bytes, for // strings, maps, and objects. Rounded up to nearest power of two mlock : 0, // Optional, default=0%: % of EMS memory to lock into RAM useMap : true, // Optional, default=false: Map keys to indexes useExisting : true, // Optional, default=false: // Preserve data if an file already exists persist : true, // Optional, default=true: // Preserve the file after threads exit doDataFill : false, // Optional, default=false: Initialize memory dataFill : undefined, // Optional, If this property is defined, // the EMS memory is filled with this value setFEtags : 'full', // Optional, If defined, set 'full' or 'empty' filename : '/path/to/file' // Optional, default=anonymous: // Path to the persistent file of this array }

RETURNS <EMS Array>
EXAMPLES var foo = ems.new(nItems) Create a new non-persistent shared memory EMS array with no heap space. Scalar data (Number, Boolean, Undefined) may be stored in this array, but not strings, arrays, or objects.
var foo = ems.new(nItems, size, '/tmp/EMS_foo') Create a file-backed EMS shared memory space with the filename /tmp/EMS_foo. In addition to the scalar storage, space for strings totaling size bytes is also reserved for strings, arrays, objects, and maps.
var x = ems.new(nItems, size) Create a new non-persistent shared memory EMS array that has space for strings totaling size bytes
Parallel Region (Fork-Join execution only)
CLASS METHOD ems.parallel( [args, ...] func )
SYNOPSIS When using fork-join execution, the master thread enters a parallel region by executing func once from each process. The master process first starts all the other processes running the function asynchronously, then executes the function itself synchronously. All processes join (perform a barrier) after completion of the work function. The results of the function are discarded. Global variables on each node are persistent between parallel regions.
ARGUMENTS args <Any> Zero or more arguments to be passed to the function.
func <Function> Function to be executed once on every thread. The optional arguments are used when calling the function. Return value is ignored.

EXAMPLES ems.parallel( doWork ) The function doWork is executed by every thread.
ems.parallel( foo, "Smith", 123, doWork ) The function call doWork(foo, "Smith", 123) is performed once by each process.
Parallel Loops
CLASS METHOD ems.parForEach( first, last, function [, scheduling [, minChunk] ] )
SYNOPSIS Parallel loop execution, distributing the iterations among all the threads. The function is invoked with one argument, the current iteration number. Iterations are divided among the threads according to the scheduling method specified. Parallel for loops must not be nested. The system should check for this and fall back on serial execution. A barrier is implied at the end.

ARGUMENTS first <Number> Index to start iterating
last <Number> Index to stop iterating (non-inclusive).
func <Function> Loop body, only input argument is current loop index:
function foo(idx) {...}
scheduling <String> guided [minChunk]: Decreasing amounts of work are assigned to each task until minChunk iterations per thread is reached. Load balancing occurs when new chunks are assigned to threads.
static: Equal number of iterations are given to each thread, no dynamic load balancing is performed.
dynamic: All threads share one index which is atomically incremented by 1 after each iteration. Provides ideal load balancing at the cost of high per-iteration overhead.
minChunk <Number> (Optional, only used when scheduling='guided', default=1) Minimum number of iterations assigned to a single thread.

EXAMPLES ems.parForEach(0, nItems-1, func) Execute the func function nItems-1 times with indexes 0..nItems-1, inclusive.
ems.parForEach(10000, 20000, func, 'guided', 200) Distribute iterations numbered 10,000-20,000 (inclusive) using the guided method with a minimum chunk size of 200 iterations
ems.parForEach(0, 999, func, 'static') Execute func() 1,000 times with indexes 0..999 inclusive. Distribute the iterations evenly across threads in contiguous blocks.
Barrier Synchronization
CLASS METHOD ems.barrier()
SYNOPSIS All the threads must reach the same barrier before proceeding. Failure to call a barrier from every process will result in deadlock. If called outside a parallel region, a barrier has no effect.

EXAMPLES ems.barrier() All threads reach the barrier before proceeding.
Critical Region
CLASS METHOD ems.critical( func )
SYNOPSIS Perform function func() mutually exclusive of other threads. Serializes execution through all critical regions. Named regions would be more like OpenMP

ARGUMENTS func <Function> Function to perform sequentially.

EXAMPLES ems.critical( function() { // Use a shared resources } ) A shared resource is accessed sequentially, but in no particular order.
Execute on Master Thread Only
CLASS METHOD ems.master( func )
SYNOPSIS Perform function func() only on thread 0, implies a barrier.

ARGUMENTS func <Function> Function to perform only by task 0.

EXAMPLES ems.master( function() { console.log("Only task 0") } ) Console logging performed only by task 0
Execute once on any thread
CLASS METHOD ems.single( func )
SYNOPSIS Perform function func() only once by the first thread to reach the statement. Implies a barrier.

ARGUMENTS func <Function> Function to be performed once.

EXAMPLES ems.single( function() { console.log("Only first task") } ) Console logging is performed only once.
Print Diagnostic Message
CLASS METHOD ems.diag( message )
SYNOPSIS Print a diagnostic message to the console with a prefix indicating the task ID.

ARGUMENTS message <String> Text of message to print to the console

EXAMPLES ems.diag( "Hello, world!" ) EMS 3: Hello, world! appears on console

EMS Array Methods

Intrinsic Atomic Operations (AMOs)
Read EMS Memory
ARRAY METHOD emsArray.read( index )
emsArray.readFE( index )
emsArray.readFF( index )
emsArray.readRW( index ), emsArray.releaseRW( index )

SYNOPSIS The read family of EMS memory operations return the data stored in an EMS array element. The value may be any JSON type.
read
Immediately and unconditionally returns the stored value, ignoring the tag state. Reading uninitialized mapped indexes will return undefined regardless of the default value that would be returned with read__, cas, faa.
readFE
Blocks until the data element is full, then atomically reads the value and marks it empty.
readFF
Blocks until the data element is full, then atomically reads leaving it full. This allows safe read access of data which may be updated simultaneously. readFF ensures mutual exclusion, and will block if the data is already under a Readers-Writer lock.
readRW, releaseRW
Blocks until the data element is full or already under a Readers-Writer lock, then increments the Readers-Writer reference count. The function emsArray.releaseRW() decrements the reference count, restoring the state to Full if no readers remain.
ARGUMENTS index <Number | String> Index in the EMS array of data to read

RETURNS read__ : < Number | Boolean | String | Undefined | Array | Object >
releaseRW : <Number> Number of pending readers sharing the lock.
EXAMPLES var n = histogram.read(3) Read the value in bin 3 of the histogram.
var word = dictionary.read(idx) Read dictionary word at index/key idx in the dictionary array.
var x = arr.readFE(idx) Block until the element at index i is full, atomically read the value and mark it empty.
var x = arr.readFF(idx) Block until the element at index i is full, atomically read the value and leave it full.
var x = arr.readRW(idx) Acquire a shared-exclusive readers-writer lock.
Write EMS Memory
ARRAY METHOD emsArray.write( index, value )
emsArray.writeXE( index, value )
emsArray.writeXF( index, value )
emsArray.writeEF( index, value )

SYNOPSIS Write a value to an element of an EMS array.
write
Immediately and unconditionally writes the value to memory. This operation does not honor or modify the full/empty tag status.
writeXE
Unconditionally and atomically writes the value to the data element and marks the element empty.
writeXF
Unconditionally and atomically writes the value to the data element and marks the element full.
writeEF
Blocks until the element is empty, and then atomically writes the value and marks the element full.
ARGUMENTS index <Number | String> Index in the EMS array of data to read
value <Any> Primitive value to store in the array at element numbered index.

EXAMPLES histogram.write(idx, 0) Initialize the value of histogram[idx] to 0.
dictionary.write(idx, "Hello") Add the string "Hello" to the EMS array dictionary at index idx.
arr.writeXE(i, undefined) Purge the memory at index i of the EMS array arr.
arr.writeXF(j, 'Mr. Jones') Unconditionally write the string 'Mr. Jones'to the EMS array arr at index j and atomically mark the element full.
arr.writeEF(2, v) Block until the element at index 2 of arr is empty, atomically write the value v and mark the memory full.
Atomic Fetch and Add
ARRAY METHOD emsArray.faa( index, value )

SYNOPSIS Atomically read the array's JSON primitive element (scalar or string, not array or object), add the value, and write the new value back to memory. Return the original contents of the memory.

ARGUMENTS index <Integer | String> Index of the element in the EMS array emsArray to atomically add to.
value < Number | Boolean | String | Undefined > Value to add to the EMS memory.

RETURNS < Number | Boolean | String |
Undefined >
The results are the same type as if a + b were performed.
EXAMPLES oldVal = statistics.faa( timerIdx, elapsed ) Return the value in memory before the add operation.
currentSum = arr.faa( index, value ) + value Return the current value after the atomic operation has occurred.
Atomic Compare and Swap
ARRAY METHOD emsArray.cas( index, oldValue, newValue )

SYNOPSIS Atomically read the JSON primitive element (scalar or string, not object or array) stored at the array's index, compare the original value to oldValue, and if they are equivalent store the new value. The CAS operation succeeded if the value returned is equivalent to oldValue. CAS will block until the EMS memory is marked full. CAS is the equivalent of atomically performing:
if( arr[idx] == oldValue ) then arr[idx] = newValue

ARGUMENTS index <Integer | String> Index into the EMS array to update.
oldValue <Number | Boolean | String | Undefined> Value to compare to the value stored in memory.
newValue <Any > Value to store if the value in memory is oldValue

RETURNS < Number | Boolean | String | Undefined > The value in memory when the compare was performed.
EXAMPLES acquiredLock = arr.cas(index, UNLOCKED, LOCKED) == UNLOCKED Evaluates as true if the lock stored at arr[index] was acquired.
oldWord = users.cas(1234, 'Cooking', 'Eating') Attempt to atomically update user 1234's record from the string 'Cooking' to the string 'Eating'

Composed Array Operations

Composed operations use EMS intrinsics to perform deadlock free atomic operations involving multiple EMS elements. The composed operations use the tags and require data to be full or empty as appropriate for the semantics of the operation.

Transactional Processing of Multiple Elements
CLASS METHOD ems.tmStart( tmElements )
ems.tmEnd( tmHandle, doCommit )

SYNOPSIS Lock (by transitioning tags from Full to Empty) one or more EMS elements in a deadlock free way. When multiple locks must be acquired, this function guarantees at least one thread will always make progress. The optional third element indicates the element is read-only and will not be modified by the task while the lock is held. Read-only data is locked using a Readers-Writer lock, permitting additional concurrency.
Performing transactions within transactions can result in deadlock if the thread tries to recursively lock an element.

ARGUMENTS tmElements <Array> Array identifying which EMS array elements should be locked. Each array element is itself an array naming the EMS array and index/key of the data and an optional Read-Only hint: [ emsArray, index (, isReadOnly) ]
tmHandle <Object> Returned from tmStart(), contains state information needed to abort or commit the transaction.
doCommit <Boolean> Commits the transaction if true, or aborts and rolls back the transaction if false or undefined.

RETURNS ems.tmStart() : < tmHandle > Transaction Handle used later to commit or abort.
EXAMPLES tm = ems.tmStart( [ [users, 293, true], [comments, 8922] ] ) Lock element 293 in the users EMS array with a read-only intent, and also lock record 8922 in the comments EMS array.
tm = ems.tmStart( [ [arrA, idxA0], [arrA, idxA1] ] ) Lock indexes idxA0 and idxA1 in array arrA for update to both values.
tm = ems.tmStart([ [arrA, idxA0], [arrA, idxA1, true], [arrB, idxB0, true] ]) Acquire and free locks on the elements in lockList. Element arrA[idxA0] may be modified, but elements arrA[idxA1] and arrB[idxB0] are read-only.
ems.tmEnd( tm, true ) Commit the transaction
Stacks & Queues
ARRAY METHOD emsArray.push( value )
emsArray.pop( )
emsArray.enqueue( value )
emsArray.dequeue( )

SYNOPSIS Append or remove data from a LIFO or FIFO. If the queue or stack is empty, the pop or dequeue operation returns Undefined, which is indistinguishable from an Undefined that was explicitly pushed onto the stack.

ARGUMENTS value <Any> Value to add to the queue or stack.

RETURNS emsArray.pop(), emsArray.dequeue() : < Any >
emsArray.push() : < Number > emsArray.enqueue() : < Number > The number of elements presently on the stack or queue
EXAMPLES comments.push( "Hello, world" ) Append the string to the EMS array comments
mostRecent = comments.pop() Atomically return the value at the top of the stack.
Look up a key used to map a value
CLASS METHOD emsArray.index2key( index )
SYNOPSIS Convert an index into an EMS array to the key used to map a value to that hashed index. This function can be used to iterate over all the elements of a mapped array.

ARGUMENTS index <Number> Index of the element in the EMS array to get the key for

RETURNS < Any > The key used to map the value which hashed to this index.
Freeing EMS Arrays
CLASS METHOD emsArray.destroy( remove_file )
SYNOPSIS Release the persistent and non-persistent resources associated with an EMS array, alternatively persisting the data if remove_file is false. Implies a barrier.

ARGUMENTS remove_file <Boolean> If true, remove the file, else allow the EMS file to persist.

RETURNS None.
Synchronize EMS Memory to Persistent Storage
CLASS METHOD emsArray.sync( [ index [, nElements] ] )
SYNOPSIS Synchronize the EMS memory with persistent storage.

ARGUMENTS index <String | Number> (Optional, default = entire EMS array) Index of the element in the EMS array to synchronize to disk
nElements <Number> (Optional, only defined if index is also defined, default = 1) Number of sequential indexes, starting with index, that should be synchronized to disk

RETURNS < Boolean > True if memory was successfully synchronized to disk, otherwise false.
EXAMPLES users.sync( userID ) The user's record is committed to disk before the function returns.
TODO Reduce
ARRAY METHOD emsArray.reduce( func )

SYNOPSIS Perform a parallel reduction on the elements of an EMS array.

ARGUMENTS func <Function> Function to combine this element with the partial reduction. Function arguments are func(element, sum), and the function returns a new sum. The arguments and results may be of any type.

RETURNS < Any >
EXAMPLES sum = distances.reduce( function( val, sum ) { return(sum+val) } ) Perform the arithmetic sum on the values in the array.
max = distances.reduce( function( val, currMax ) { return(val > currMax ? tmp : currMax) } ) Find the maximum value in the distance array.
TODO: Permute
ARRAY METHOD emsArray.permute( order )

SYNOPSIS Reorder the array indexes to the ordering specified in the array order. If order contains the same index more than once, the output for those indexes is undefined.

ARGUMENTS order <Array> Integer permutation array.

EXAMPLES arr.permute( order ) Reorder the array arr to the new element ordering.
TODO: Scan
ARRAY METHOD emsArray.scan( func )

SYNOPSIS Perform a parallel prefix operation on the EMS array elements:
a[i] = func(a[i], a[i-1])

ARGUMENTS func <Function> Function to combine this element and the previous element. Function arguments are
func(thisElement, previousElement), and returns a new partial result.

RETURNS < Array >
EXAMPLES partialSums = subtotals.scan( function(val, sum ) { return(sum+val) } ) Return a EMS array with the partial sums in each element.
TODO: Map
ARRAY METHOD emsArray.map( func [, name] )

SYNOPSIS Produce a new EMS array of the same length by invoking func() for each element in the array to produce the corresponding element in the new EMS array.

ARGUMENTS func <Function> Function to perform on each array element. The function is invoked with one argument, the EMS primitive element, and returns a new primitive element.
Persistent filename <String> (Optional, default is non-persistent) If the new mapping should have a persistent backing on a filesystem, this assigns the fully qualified filename.

RETURNS < Number | Boolean | String | Undefined >
EXAMPLES sizescm = sizesInches.map( function(inches) { return(inches * 2.54) } ) Scales each element in the array values from inches to cm, producing a new EMS array with scaled values.
TODO: Filter
ARRAY METHOD emsArray.filter( func [, name] )

SYNOPSIS Produce a new EMS array containing only the elements which evaluate as true when passed as the argument to func(element). The elements of the new array may appear in any order.

ARGUMENTS func <Function> Function to perform on each array element. The function is invoked with one argument, the EMS primitive element, and returns a boolean indicating if the element should be included in the new array.
name <String> (Optional, default is non-persistent) If the new mapping requires persistent backing on a filesystem, the fully qualified filename is defined.

RETURNS < EMS Array >
EXAMPLES bigParts = allParts.filter( function(part) { return(part.size > 100) } ) The new EMS array returned contains only parts whose size is greater than 100.

This browsing experience is Copyright ©2014-2020, Jace Mogill . Proudly designed and built in Cascadia.