This section provides additional details about using the MapReduce library and how it is implemented. These topics are covered:
As explained in this section, keys and values are variable-length strings of bytes. The MR-MPI library knows nothing of their contents and simply treats them as contiguous chunks of bytes.
When you register a key and value in your mymap() or mycompress() or myreduce() function via the KeyValue add() method, you specify their lengths in bytes. Keys and values are typically returned to your program for further processing or output, e.g. as arguments passed to your myreduce() function by the reduce() operation, as are their lengths.
Keys and values are passed as character pointers to your functions where you may need to convert the pointer to an appropriate data type and then correctly interpret the byte string. For example, either of these lines could be used:
int *iptr = (int *) key; int myvalue = *(int *) key;
If the key or value is a variable-length text string, you may want to terminate it with a "0", and include the trailing "0" in the byte count, so that C-library-style string functions can later be invoked on it. If a key or value is a complex data structure, your function must be able to decode it.
IMPORTANT NOTE: An eaay way to encapsulate several datums as a key (or value) is to create a C struct that includes each of them. Then the sizeof() function gives the byte count of the struct and the compiler takes care of data alignment issues, as described below. If you do this for creating a key, then be aware that your individual datums may not use up all the bytes returned by the sizeof() function. Again this is due to alignment constraints imposed by the compiler. Normally this isn't something your code would worry about since you only acces the datums, but if the struct is used as a key, and some bytes in the key are never intialized (by you filling in the datums), then when that key is hashed by the MR-MPI library. e.g. to perform a collate() operation, those uninitialized bytes will also be hashed. Since the uninitialed bytes may contain random garbage, this means 2 keys with identical datums, might not hash identically, and thus their values would not be combined as you expect into a single KeyMultiValue. The only solution for this is for you to initialize the struct before setting its datums, e.g.
typedef struct double x; int i; Tuple; Tuple tuple; memset(&tuple,0,sizeof(Tuple)); tuple.x = 1.0; tuple.i = 1;
The memset() function initializes the entire tuple to 0. Note that in this case sizeof(Tuple) is likely 16 bytes, but the x and i datums will only set 12 of the 16 bytes, leaving the last 4 uninitialized. Also note that this whole discussion is irrelevant if the struct is used only as a value, since only keys are hashed.
A related issue with keys and values is the byte-alignment of integer or floating point values they include. For example, it is usually a bad idea to store an 8-byte double such that it is mis-aligned with respect to an 8-byte boundary in memory. The reason is that using a mis-aligned double in a computation may be slow.
If your keys or values are homogeneous (e.g. all integers), you can use the keyalign and valuealign settings, discussed here, to insure alignment of keys and values to desired byte boundaries. Since this may incur extra memory costs, you should not typically make these settings larger than needed.
Special care may need to be taken if your values are heterogeneous, e.g. a mixture of strings and integers. This is because the MR-MPI library packs values one after the other into one long byte string when it is returned to your program as a multi-value, e.g. as an argument to the callback of a reduce() method. Only the first value in the multi-value is aligned to the valuealign setting. Similarly, the collapse() method creates a multi-value that is sequence of key,value,key,value,etc from a KV. If the keys are variable-length text strings and the values are integers, then the values will not be aligned on 4-byte boundaries.
Here are two ideas that can be used to insure alignment of heterogeneous data:
(a) Say your "value" is a 4-byte integer followed by an 8-byte double. You might think it can be stored and registered as 12 contiguous bytes. However, this would likely mean the double is mis-aligned. One solution is to convert the integer to a double before storing both quantities in a 16-byte value string. Another solution is to create a struct to store the integer and double and use the sizeof() function to determine the length of the struct and use that as the length of your "value". The compiler should then guarantee proper alignment of each structure member. If you use such a struct as a key, be aware of the "IMPORTANT NOTE" explained above.
(b) Your callback function can always copy the bytes of a key or value into a local data structure with the proper alignment, e.g. using the C memcpy() function. E.g. in the collapse example above, these lines of code:
int myvalue; memcpy(&myvalue,&multivalue[offset],sizeof(int));
would load the 4 bytes of a particular value (at location offset) in the multi-value into the local integer "myvalue", where it can then be used for computation.
KeyValue and KeyMultiValue objects are described in this section. A MapReduce object contains either a single KeyValue object (KV) or a single KeyMultiValue object (KMV), depending on which methods you have invoked.
The memory cost for storing key/value pairs in a KV is as follows. The key and value each have a byte length. Two integers are also stored for the key and value length. There may also be additional bytes added to align the key and value on byte boundaries in memory; see the keyalign and valuealign settings, discussed in this section. Thus the total size of a KV is the memory for the key/value datums plus 2 integers per pair plus any extra alignment bytes.
A KMV contains key/multi-value pairs where the number of pairs is typically the number of unique keys in the original KV. The memory cost for storing key/multi-value pairs in a KMV is as follows. The key and multi-value each have a byte length. For the multi-value, this is the sum of individual value lengths. Again, there may also be additional bytes added to align the key and multi-value on byte boundaries in memory; see the keyalign and valuealign settings, discussed in this section. Three integers are also stored: the key and multi-value length, and the number of values N in the multi-value. An N-length array of integers is also stored for the length of each value in the multi-value. Thus the total size of a KMV is the memory for the key/multi-value datums plus 3 integers per pair plus 1 integer per value in the original KV plus any extra alignment bytes.
Note that memory for key data in a KMV is typically less than in the original KV, since the KMV only stores unique keys. The memory for multi-value data is the same as the value data in the original KV, since all the original KV values are contained in the multi-values.
Note that in parallel, for a KV or KMV, each processor stores the above data for only a fraction of key/value pairs it generated during a map() operation or acquired during other operations, like a collate(). If this is imbalanced, one processor may own and process datums more than other processors.
If KV or KMV data on a processor exceeds the page size determined by the memsize setting, discussed here, then data is written to temporary disk files, on a per-processor basis.
If the KV or KMV pairs of a data set owned by a processor fit within a single page of memory, whose size is determined by the memsize setting, then the MR-MPI library operates on the data in-core; no disk files are written or read.
When the data on any single processor exceeds the page size, that processor will write data, one page at a time, to one or more temporary disk files, and later read it back in as needed, again one page at a time. Thus all the MR-MPI methods can be invoked on data sets larger than fit in the aggregate memory of the processors being used. The only real limitation in this case is available disk space.
All of the MR-MPI methods, except one, perform their operations within a fixed number of memory pages. This includes memory needed for message passing calls to the MPI library, e.g. buffers used to send and receive data. Any large data exchanges are performed with pre-posted receives (MPI_Irecv) into user-space memory, which do not require additional internal MPI library memory.
The number of required pages ranges from 1 to 7, and is listed on this page for each MR-MPI library method. This means, for example, that even if the page size is 1 Mb (smallest allowed value), and the data set size is 10 Gb per processor, and the sort_keys() method is invoked, which requires 5 pages per processor, that the operation will successfully complete, using only 5 Mb per processor. Of course, there may be considerable disk I/O performed along the way.
The one exception is the convert() method, also called by the collate() and commpress() methods, which performs an on-processor reorganization of the data in a KV to produce a KMV. For large data sets this requires breaking up the large KV data file into smaller files, each of which holds data that will contribute to one page of the eventual KMV file. Each smaller file requires an in-memory buffer to store data that is written to the file. The number of these smaller files, and hence the number of buffers, is hard to predict in advance or even bound. It depends on the page size and the characteristics of the KV pairs, e.g. how many unique keys there are. The number of extra allocated pages needed to store these buffers depends of the number of small files and the minimum buffer size, which is currently set at 16K bytes for reasonable disk I/O performance. If a very large number of small files are needed to partition the KV data and the page size is small, then several extra memory pages may need to be allocated. This is not normally the case, but the number of small files and number of allocated pages can be monitored if the verbosity setting is non-zero. Note that a larger page size will reduce the number of extra pages the convert() method needs to allocate.
IMPORTANT NOTE: You should choose a memsize setting that insures the total memory consumed by all pages allocated by all the MapReduce objects you create, does not exceed the physical memory available (which may be shared by several processors if running on a multi-core node). If you do this, then many systems will allocate virtual memory, which will typically cause MR-MPI library operations to run very slowly and thrash the disk.
Also note that in addition to "pages", there are numerous additional small allocations of memory made by the MR-MPI library. Here are two examples. The aggregate() method allocates vectors of length P = the number of processors. Out-of-core disk files are stored as "pages" of data. Each page requires some in-memory bookkeeping so it can be written and read. Thus if a file grows to 1000s of pages, the corresponding in-memory bookkeeping structure will also become larger. For normal page sizes as determined by the memsize setting, e.g. the 64 Mbyte default, these additional in-memory allocations should be small compared to the size of a single page.
Even in out-of-core mode, the MR-MPI library has limitations on the data set sizes it can process. In practice, these are hopefully not restrictive limits.
Define:
Internal storage limits within library:
KV = KeyValue, KMV = KeyMultiValue
Additional notes:
The user sets the "pagesize" via the memsize setting, in Mbytes. The pagesize can exceed INTMAX, though it should not exceed the physical memory available. See the discussion above for more details.
Since the data set size is written to disk, when the library operates in out-of-core mode, the data size cannot exceed available disk space, either on a per-processor basis (if each processor is writing to its own local disk), or in aggregate (e.g. for a parallel file system). Some MR-MPI operations convert data from one form to another (e.g. KV to KMV) or make intermediate copies of data (e.g. for sorting). At a minimum this typically requires 2x the disk space of the data set itself.
As discussed here, a KeyValue pair requires 2 integers plus the key and value, plus alignment space. For a 1-byte key and a 0-byte value, this is a minimum of 12 bytes. By storing no more than INTMAX KeyValue pairs on a page, this still allows for pagesizes of nearly 24 Gb, more if KeyValue pair sizes are larger.
The various INTMAX limits mean that user calls to the library, and library callbacks to user functions can use int parameters rather than uint64 parameters. It also reduces storage requirements for individual KeyValue and KeyMultiValue pairs. One exception is that all the library methods return a uint64 for the final number of KeyValue or KeyMultiValue pairs stored by the library. Another exception is the uint64 "itask" variable passed back to one flavor of the user mymap() function via the map() method.
The INTMAX limits on the number of KeyMultiValue values stored in one page, mean that individual KeyMultiValue pairs that exceed this will be split across multiple pages. The user callback functions access these pages via the multivalue_blocks() and multivalue_block() methods, described witht the reduce() method.
The convert() and collate() methods use a hash function to organize keys and find duplicates. The MR-MPI library uses the hashlittle() function from lookup3.c, written by Bob Jenkins and available freely on the WWW. It operates on arbitrary-length byte strings (a key) and produces a 32-bit integer hash value, a portion of which is used as a bucket index into a hash table.
Several of the library methods take a callback function as an argument, meaning that function is called back to from the library when the method is invoked. These functions are part of your MapReduce program and can perform any operation you wish on your data (or on no data), so long as they produce the appropriate information. E.g. they generate key/value pairs in the case of map() or compress() or reduce(), or they hash a key to a processor in the case of aggregate() or collate(), or they compare two keys or values in the case of sort_keys() or sort_values().
The mymap() and myreduce() functions can perform simple operations or very complex, compute-intensive operations. For example, if your parallel machine supports it, they could invoke another program or script to read/parse an input file or calculate some result.
Note that in your program, a callback function CANNOT be a class method unless it is declared to be "static". It can also be a non-class method, i.e. just a stand-alone function. In either case, such a function cannot access class data.
One way to get around this restriction is to define global variables that allow your function to access information it needs.
Another way around this restriction is to use the feature provided by several of the library methods with callback function arguments which allow you to pass in a pointer to whatever data you wish. This pointer is returned as an argument when the callback is made. This pointer should be cast to (void *) when passed in, and your callback function can later cast it back to the appropriate data type. For example, a class could set the pointer to an array or an internal data structure or the class itself as "(void *) this". Specify a NULL if your function doesn't need the pointer.
Using the MR-MPI library from Python incurs two not-so-obvious overheads beyond the usual slowdown due to using an interpreted language. First, Python objects used as keys and values are "pickled" and "unpickled" using the cPickle Python library when passed into and out of the C++ library. This is because the library stores them as byte strings. The pickling process serializes a Python object (e.g. an integer, a string, a tuple, or a list) into a byte stream in a way that it can be unpickled into the same Python object.
The second overhead is due to the complexity of making a double callbacks between the library and your Python script. I.e. the library calls back once to the user program which then calls back into the library. Consider what happens during a map() operation when the library is called from a C++ program.
When doing this from Python there are 3 additional layers between the Python program and the library, the Python mrmpi class, an invisible C layer (created by ctypes), and the C interface on the C++ library itself. Thus the callback operation proceeds as follows:
Thus 3 calls have become 11 due to the 3 additional layers data must pass through. Some of these pass throughs are very simple, but others require massaging and copying of data, like the pickling/unpickling described above, which occurs in the mrmpi class methods. I was somewhat surprised this double-callback sequence works as well and as transparently as it does - Python ctypes is amazing!
The error messages printed out by the MR-MPI library are hopefully self-explanatory. At some point they will be listed in these doc pages.