EMS internally stores tags that are used for synchronization of user data, allowing synchronization to happen independently of the number or kind of processes accessing the data. The EMS primitives enforce atomic access using automatic state transitions, and higher level intrinsics like stacks, queues, and transactional memory are built on the EMS primitives.
EMS memory is an array of JSON values accessed using atomic operators and/or transactional memory. Safe parallel access is managed by passing through multiple gates: First mapping a key to an index, then accessing user data protected by EMS tags, and completing the whole operation atomically. |
EMS Data Tag Transitions & Atomic operations: F=Full, E=Empty, X=Don't Care, RW=Readers-Writer lock (# of current readers) CAS=Compare-and-Swap, FAA=Fetch-and-Add |
Operations which inspect or affect the global EMS state are performed
using class methods of
the EMS object returned by the require
statement.
Operations that modify the data and tags are performed using methods
belonging to EMS array to be operated on.
REQUIRE | require('ems') ( nThreads [, threadAffinity [, parallelType [, contextName ] ] ] ) | ||
SYNOPSIS | Initialize the EMS module, starting
all the other threads. Thread identity and processor affinity
is assigned when the thread is created.
|
||
ARGUMENTS | nThreads | <Number> | Total number of threads the job should use. |
threadAffinity | <Boolean> | (Optional, Default = false , Affects only Linux.)
Set the scheduling affinity of each thread to it's own
core, assigning over-subscribed threads in a round-robin
fashion.
|
|
parallelType | <String> | (Optional, Default=bsp )
One of bsp , fj , or user . Execution model:
bsp will use EMS' built-in Bulk Synchronous Parallel execution,
fj uses EMS' built-in Fork-join execution
user creates no parallelism
|
|
contextName | <String> | (Optional, Default=anonymous )
Unique name of parallel context being initialized, required to distinguish
between multiple EMS parallel programs running simultaneously on the
same system.
|
RETURNS | ems = { nThreads : Number, // Number of threads executing myID : Number // This thread's ID 0..nThreads-1 } | |
EXAMPLES | ems = require('ems')(process.argv[2]) | Use the first command line argument as the number of nodes:
node foo.js 4 executes using 4 threads. |
ems = require('ems')() | Run on one node | |
ems = require('ems')(process.argv[2], false, true) | Use first command line argument as number of nodes, do not set affinity of the threads to a specific CPU, execute using fork-join parallelism. |
CLASS METHOD | ems.new( [ nElements [, heapSize [, fileName] ] ] ) | ||
ems.new( emsArrayDescriptor ) | |||
SYNOPSIS | Attach to an existing or create a new EMS array.
Creation of new (do not use existing) EMS memory regions implies a barrier,
whereas using an existing EMS file will block until the file exists.
|
||
ARGUMENTS | nElements | <Number> | (Optional, Default is 1) Maximum number of elements in the EMS array or map. |
<Array> | An array of dimensions of a multi-dimensional array.
A 100×30×50 cube is described by [100, 30, 50] . |
||
heapSize | <Number> |
(Optional, Default is 0)
Maximum number of bytes reserved for strings,
arrays, maps, and object elements in this array.
The actual amount of memory allocated for use is
rounded up to the nearest power of 2
ems_alloc.h: EMS_MEM_BLOCKSZ
|
|
fileName | <String> | (Optional, Default is anonymous) Fully qualified file name of the file to use as a persistent backing store for the EMS array, tags, and bookkeeping information. | |
emsArrayDescriptor | <Object> |
(Alternative to two scalar argument) A complete EMS Array
descriptor may be passed as the only argument instead of
scalar arguments.
emsArrayDescriptor = {
dimensions : 100, // Required: Max # elements in EMS array
// Integer for 1-D, or array of dims [x, y, z]
heapSize : 100000, // Optional, default=0: Space, in bytes, for
// strings, maps, and objects. Rounded up to nearest power of two
mlock : 0, // Optional, default=0%: % of EMS memory to lock into RAM
useMap : true, // Optional, default=false: Map keys to indexes
useExisting : true, // Optional, default=false:
// Preserve data if an file already exists
persist : true, // Optional, default=true:
// Preserve the file after threads exit
doDataFill : false, // Optional, default=false: Initialize memory
dataFill : undefined, // Optional, If this property is defined,
// the EMS memory is filled with this value
setFEtags : 'full', // Optional, If defined, set 'full' or 'empty'
filename : '/path/to/file' // Optional, default=anonymous:
// Path to the persistent file of this array
}
|
RETURNS | <EMS Array> | |
EXAMPLES | var foo = ems.new(nItems) | Create a new non-persistent shared memory EMS array with no heap space. Scalar data (Number, Boolean, Undefined) may be stored in this array, but not strings, arrays, or objects. |
var foo = ems.new(nItems, size, '/tmp/EMS_foo') | Create a file-backed EMS shared memory
space with the filename /tmp/EMS_foo . In addition to the
scalar storage,
space for strings totaling
size bytes is also reserved for strings, arrays, objects, and maps. |
|
var x = ems.new(nItems, size) | Create a new non-persistent shared memory EMS
array that has space for strings totaling size bytes |
CLASS METHOD | ems.parallel( [args, ...] func ) | ||
SYNOPSIS | When using fork-join execution, the master
thread enters a parallel region by executing func
once from each process. The master process first starts all the
other processes running the function asynchronously, then executes the function
itself synchronously. All processes join (perform a barrier )
after completion of the work function.
The results of the function are discarded. Global variables on each node
are persistent between parallel regions.
|
||
ARGUMENTS | args | <Any> | Zero or more arguments to be passed to the function. |
func | <Function> | Function to be executed once on every thread. The optional arguments are used when calling the function. Return value is ignored. |
EXAMPLES | ems.parallel( doWork ) | The function doWork is executed by every thread. |
ems.parallel( foo, "Smith", 123, doWork ) | The function call doWork(foo, "Smith", 123)
is performed once by each process. |
CLASS METHOD | ems.parForEach( first, last, function [, scheduling [, minChunk] ] ) | ||
SYNOPSIS | Parallel loop execution, distributing
the iterations among all the threads. The function is invoked
with one argument, the current iteration number. Iterations are
divided among the threads according to the
scheduling method specified.
Parallel for loops must not be nested.
|
||
ARGUMENTS | first | <Number> | Index to start iterating |
last | <Number> | Index to stop iterating (non-inclusive). | |
func | <Function> | Loop body, only input argument is current loop index:
function foo(idx) {...} |
|
scheduling | <String> |
guided [minChunk] : Decreasing amounts of work are assigned to each task until minChunk iterations per thread is reached. Load balancing occurs when new chunks are assigned to threads.static : Equal number of iterations are given to each thread, no dynamic load balancing is performed.dynamic : All threads share one index which is atomically
incremented by 1 after each iteration. Provides ideal load balancing at the
cost of high per-iteration overhead. |
|
minChunk | <Number> | (Optional, only used when scheduling='guided' , default=1 ) Minimum number of iterations assigned to a single thread.
|
EXAMPLES | ems.parForEach(0, nItems-1, func) | Execute the func function nItems-1 times with indexes
0..nItems-1 , inclusive. |
ems.parForEach(10000, 20000, func, 'guided', 200) | Distribute iterations numbered 10,000-20,000 (inclusive) using the guided
method with a minimum chunk size of 200 iterations |
|
ems.parForEach(0, 999, func, 'static') | Execute func() 1,000 times with indexes
0..999 inclusive. Distribute the iterations evenly across
threads in contiguous blocks. |
CLASS METHOD | ems.barrier() | ||
SYNOPSIS | All the threads must reach the same barrier before proceeding. Failure to call a barrier from every process will result in deadlock. If called outside a parallel region, a barrier has no effect. |
EXAMPLES | ems.barrier() | All threads reach the barrier before proceeding. |
CLASS METHOD | ems.critical( func ) | ||
SYNOPSIS | Perform function func()
mutually exclusive of other threads. Serializes execution through
all critical regions.
|
||
ARGUMENTS | func | <Function> | Function to perform sequentially. |
EXAMPLES | ems.critical( function() { // Use a shared resources } ) | A shared resource is accessed sequentially, but in no particular order. |
CLASS METHOD | ems.master( func ) | ||
SYNOPSIS | Perform function func()
only on thread 0, implies a barrier. |
||
ARGUMENTS | func | <Function> | Function to perform only by task 0. |
EXAMPLES | ems.master( function() { console.log("Only task 0") } ) | Console logging performed only by task 0 |
CLASS METHOD | ems.single( func ) | ||
SYNOPSIS | Perform function func()
only once by the first thread to reach the statement.
Implies a barrier. |
||
ARGUMENTS | func | <Function> | Function to be performed once. |
EXAMPLES | ems.single( function() { console.log("Only first task") } ) | Console logging is performed only once. |
CLASS METHOD | ems.diag( message ) | ||
SYNOPSIS | Print a diagnostic message to the console with a prefix indicating the task ID. |
||
ARGUMENTS | message | <String> | Text of message to print to the console |
EXAMPLES | ems.diag( "Hello, world!" ) | EMS 3: Hello, world! appears on console |
ARRAY METHOD | emsArray.read( index ) | ||
emsArray.readFE( index ) | |||
emsArray.readFF( index ) |
|||
emsArray.readRW( index ), emsArray.releaseRW( index ) |
|||
SYNOPSIS | The read family of EMS
memory operations return the data stored in an EMS array element.
The value may be any JSON type.
|
||
ARGUMENTS | index | <Number | String> | Index in the EMS array of data to read |
RETURNS | read__ : < Number | Boolean | String | Undefined | Array | Object > | |
releaseRW : <Number> | Number of pending readers sharing the lock. | |
EXAMPLES | var n = histogram.read(3) | Read the value in bin 3 of the histogram. |
var word = dictionary.read(idx) | Read dictionary word at index/key idx in the dictionary array. |
|
var x = arr.readFE(idx) | Block until the element at index i
is full, atomically read the value and mark it empty. |
|
var x = arr.readFF(idx) | Block until the element at index i is full,
atomically read the value and leave it full. |
|
var x = arr.readRW(idx) | Acquire a shared-exclusive readers-writer lock. |
ARRAY METHOD | emsArray.write( index, value ) | ||
emsArray.writeXE( index, value ) | |||
emsArray.writeXF( index, value ) | |||
emsArray.writeEF( index, value ) |
|||
SYNOPSIS | Write a value to an element of an EMS array.
|
||
ARGUMENTS | index | <Number | String> | Index in the EMS array of data to read |
value | <Any> | Primitive value to store in the array at element numbered index. |
EXAMPLES | histogram.write(idx, 0) | Initialize the value of histogram[idx] to 0. |
dictionary.write(idx, "Hello") | Add the string "Hello" to the EMS
array dictionary at index idx . |
|
arr.writeXE(i, undefined) | Purge the memory at index i of the EMS array arr . |
|
arr.writeXF(j, 'Mr. Jones') | Unconditionally write the
string 'Mr. Jones' to the EMS array arr
at index j and atomically mark the element full. |
|
arr.writeEF(2, v) | Block until the element at index 2
of arr is empty, atomically write the
value v and mark the memory full. |
ARRAY METHOD | emsArray.faa( index, value ) |
||
SYNOPSIS |
Atomically read the array's JSON primitive element (scalar or string, not array or object),
add the value, and write the new value
back to memory. Return the original contents of the memory.
|
||
ARGUMENTS | index | <Integer | String> | Index of the element in the EMS array emsArray
to atomically add to. |
value | < Number | Boolean | String | Undefined > | Value to add to the EMS memory. |
RETURNS | < Number | Boolean | String | Undefined > |
The results are the same type as if
a + b were performed. |
EXAMPLES | oldVal = statistics.faa( timerIdx, elapsed ) | Return the value in memory before the add operation. |
currentSum = arr.faa( index, value ) + value | Return the current value after the atomic operation has occurred. |
ARRAY METHOD | emsArray.cas( index, oldValue, newValue ) |
||
SYNOPSIS |
Atomically read the JSON primitive element (scalar or string, not object or array)
stored at the array's index,
compare the original value to oldValue , and if they are equivalent
store the new value.
The CAS operation succeeded
if the value returned is equivalent
to oldValue .
CAS will block until the EMS memory is marked full. CAS is the
equivalent of atomically performing:if( arr[idx] == oldValue ) then arr[idx] = newValue
|
||
ARGUMENTS | index | <Integer | String> | Index into the EMS array to update. |
oldValue | <Number | Boolean | String | Undefined> | Value to compare to the value stored in memory. | |
newValue | <Any > | Value to store if the value in memory is oldValue
|
RETURNS | < Number | Boolean | String | Undefined > | The value in memory when the compare was performed. |
EXAMPLES | acquiredLock = arr.cas(index, UNLOCKED, LOCKED) == UNLOCKED | Evaluates as true if the lock stored
at arr[index] was acquired.
|
oldWord = users.cas(1234, 'Cooking', 'Eating') | Attempt to atomically update user 1234's record from the string
'Cooking' to the string 'Eating' |
Composed operations use EMS intrinsics to perform deadlock free atomic operations involving multiple EMS elements. The composed operations use the tags and require data to be full or empty as appropriate for the semantics of the operation.
CLASS METHOD | ems.tmStart( tmElements ) | ||
ems.tmEnd( tmHandle, doCommit ) |
|||
SYNOPSIS |
Lock (by transitioning tags from Full to Empty)
one or more EMS elements in a deadlock
free way. When multiple locks must be acquired, this function
guarantees at least one thread will always make progress. The
optional third element indicates the element is read-only and will
not be modified by the task while the lock is held. Read-only
data is locked using a Readers-Writer lock, permitting additional concurrency.
Performing transactions within transactions can result in deadlock if the thread tries to recursively lock an element. |
||
ARGUMENTS | tmElements | <Array> | Array identifying which EMS array
elements should be locked. Each array element is itself an array
naming the EMS array and index/key of the data
and an optional Read-Only hint:
[ emsArray, index (, isReadOnly) ]
|
tmHandle | <Object> | Returned from tmStart() ,
contains state information needed to abort or commit the transaction.
|
|
doCommit | <Boolean> | Commits the transaction if true ,
or aborts and rolls back the transaction if false or undefined .
|
RETURNS | ems.tmStart() : < tmHandle > | Transaction Handle used later to commit or abort. |
EXAMPLES | tm = ems.tmStart( [ [users, 293, true], [comments, 8922] ] ) | Lock element 293 in the users
EMS array with a read-only intent, and also lock record 8922 in
the comments EMS array.
|
tm = ems.tmStart( [ [arrA, idxA0], [arrA, idxA1] ] ) | Lock indexes idxA0 and idxA1 in array arrA
for update to both values.
|
|
tm = ems.tmStart([ [arrA, idxA0], [arrA, idxA1, true], [arrB, idxB0, true] ]) | Acquire and free locks on the elements in lockList .
Element arrA[idxA0] may be modified, but elements
arrA[idxA1] and arrB[idxB0] are read-only.
|
|
ems.tmEnd( tm, true ) | Commit the transaction |
ARRAY METHOD | emsArray.push( value ) |
||
emsArray.pop( ) |
|||
emsArray.enqueue( value ) |
|||
emsArray.dequeue( ) |
|||
SYNOPSIS | Append or remove data from a LIFO or
FIFO. If the queue or stack is empty, the pop
or dequeue operation returns
Undefined , which is indistinguishable from an Undefined
that was explicitly pushed onto the stack.
|
||
ARGUMENTS | value | <Any> | Value to add to the queue or stack. |
RETURNS | emsArray.pop(), emsArray.dequeue() : < Any > | |
emsArray.push() : < Number > emsArray.enqueue() : < Number > | The number of elements presently on the stack or queue | |
EXAMPLES | comments.push( "Hello, world" ) | Append the string to the EMS array comments |
mostRecent = comments.pop() | Atomically return the value at the top of the stack. |
CLASS METHOD | emsArray.index2key( index ) | ||
SYNOPSIS |
Convert an index into an EMS array to the key used to map
a value to that hashed index. This function can be used
to iterate over all the elements of a mapped array.
|
||
ARGUMENTS | index | <Number> | Index of the element in the EMS array to get the key for |
RETURNS | < Any > | The key used to map the value which hashed to this index. |
CLASS METHOD | emsArray.destroy( remove_file ) | ||
SYNOPSIS |
Release the persistent and non-persistent resources associated with an EMS array,
alternatively persisting the data if remove_file is false .
Implies a barrier.
|
||
ARGUMENTS | remove_file | <Boolean> | If true, remove the file, else allow the EMS file to persist. |
RETURNS | None. |
CLASS METHOD | emsArray.sync( [ index [, nElements] ] ) | ||
SYNOPSIS |
Synchronize the EMS memory with persistent storage.
|
||
ARGUMENTS | index | <String | Number> | (Optional, default = entire EMS array) Index of the element in the EMS array to synchronize to disk |
nElements | <Number> |
(Optional, only defined if index is also defined,
default = 1)
Number of sequential indexes, starting with index ,
that should be synchronized to disk |
RETURNS | < Boolean > | True if memory was successfully synchronized to disk, otherwise false. |
EXAMPLES | users.sync( userID ) | The user's record is committed to disk before the function returns. |
ARRAY METHOD | emsArray.reduce( func ) |
||
SYNOPSIS | Perform a parallel reduction on the elements of an EMS array. |
||
ARGUMENTS | func | <Function> | Function to combine this element with the
partial reduction. Function arguments are func(element,
sum) , and the function returns a new sum .
The arguments and results may be of any type. |
RETURNS | < Any > | |
EXAMPLES | sum = distances.reduce( function( val, sum ) { return(sum+val) } ) | Perform the arithmetic sum on the values in the array. |
max = distances.reduce( function( val, currMax ) { return(val > currMax ? tmp : currMax) } ) | Find the maximum value in the distance array. |
ARRAY METHOD | emsArray.permute( order ) |
||
SYNOPSIS | Reorder the array indexes to the
ordering specified in the
array order . If order contains the same
index more than once, the output for those indexes is
undefined. |
||
ARGUMENTS | order | <Array> | Integer permutation array. |
EXAMPLES | arr.permute( order ) | Reorder the array arr
to the new element ordering. |
ARRAY METHOD | emsArray.scan( func ) |
||
SYNOPSIS | Perform a parallel prefix operation on
the EMS array elements:a[i] = func(a[i], a[i-1]) |
||
ARGUMENTS | func | <Function> | Function to combine this element and the
previous element. Function arguments arefunc(thisElement, previousElement) ,
and returns a new partial result. |
RETURNS | < Array > | |
EXAMPLES | partialSums = subtotals.scan( function(val, sum ) { return(sum+val) } ) | Return a EMS array with the partial sums in each element. |
ARRAY METHOD | emsArray.map( func [, name] ) |
||
SYNOPSIS | Produce a new EMS array of the same length
by invoking func()
for each element in the array to produce the corresponding element
in the new EMS array.
|
||
ARGUMENTS | func | <Function> | Function to perform on each array element. The function is invoked with one argument, the EMS primitive element, and returns a new primitive element. |
Persistent filename | <String> | (Optional, default is non-persistent) If the new mapping should have a persistent backing on a filesystem, this assigns the fully qualified filename. |
RETURNS | < Number | Boolean | String | Undefined > | |
EXAMPLES | sizescm = sizesInches.map( function(inches) { return(inches * 2.54) } ) | Scales each element in the array values from inches to cm, producing a new EMS array with scaled values. |
ARRAY METHOD | emsArray.filter( func [, name] ) |
||
SYNOPSIS | Produce a new EMS array containing only the elements
which evaluate as true when passed as the argument to func(element) .
The elements of the new array may appear in any order.
|
||
ARGUMENTS | func | <Function> | Function to perform on each array element. The function is invoked with one argument, the EMS primitive element, and returns a boolean indicating if the element should be included in the new array. |
name | <String> | (Optional, default is non-persistent) If the new mapping requires persistent backing on a filesystem, the fully qualified filename is defined. |
RETURNS | < EMS Array > | |
EXAMPLES | bigParts = allParts.filter( function(part) { return(part.size > 100) } ) | The new EMS array returned contains only parts whose size is greater than 100. |