uint64_t MapReduce::compress(void (*mycompress)(char *, int, char *, int, int *, KeyValue *, void *), void *ptr)
uint64_t MapReduce::multivalue_blocks()
int MapReduce::multivalue_block(int iblock, char **ptr_multivalue, int **ptr_valuesizes)
This calls the compress() method of a MapReduce object, passing it a function pointer to a mycompress function you write. This method compresses a KeyValue object with duplicate keys into a new KeyValue object, where each key appears once (on that processor) and has a single new value. The new value is a combination of the values associated with that key in the original KeyValue object. The mycompress() function you provide generates the new value, once for each unique key (on that processor). The method returns the total number of key/value pairs in the new KeyValue object.
This method is used to compress a large set of key/value pairs produced by the map() method into a smaller set before proceeding with the rest of a MapReduce operation, e.g. with a collate() and reduce().
You can give this method a pointer (void *ptr) which will be returned to your mycompress() function. See the Technical Details section for why this can be useful. Just specify a NULL if you don't need this.
In this example the user function is called mycompress() and it must have the following interface, which is the same as that used by the reduce() method:
void mycompress(char *key, int keybytes, char *multivalue, int nvalues, int *valuebytes, KeyValue *kv, void *ptr)
A single key/multi-value (KMV) pair is passed to your function from a temporary KeyMultiValue object created by the library. That object creates a multi-value for each unique key in the KeyValue object which contains a list of the nvalues associated with that key. Note that this is only the values on this processor, not across all processors.
There are two possibilities for a KMV pair returned to your function. The first is that it fits in one page of memory allocated by the MapReduce object, which is the usual case. See the memsize setting for details on memory allocation.
In this case, the char *multivalue argument is a pointer to the beginning of the multi-value which contains all nvalues, packed one after the other. The int *valuebytes argument is an array which stores the length of each value in bytes. If needed, it can be used by your function to compute an offset into char *values for where each individual value begins. Your function is also passed a kv pointer to a new KeyValue object created and stored internally by the MapReduce object.
If the KMV pair does not fit in one page of memory, then the meaning of the arguments passed to your function is changed. Your function must call two additional library functions in order to retrieve a block of values that does fit in memory, and process them one block at a time.
In this case, the char *multivalue argument will be NULL and the nvalues argument will be 0. Either of these can be tested for within your function. If you know huge multi-values will not occur, then the test is not needed. The meaning of the kv and ptr arguments is the same as discussed above. However, the int *valuebytes argument is changed to be a pointer to the MapReduce object. This is to allow you to make the following two kinds of calls back to the library:
MapReduce *mr = (MapReduce *) valuebytes; int nblocks; uint64_t nvalues_total = mr->multivalue_blocks(nblocks); for (int iblock = 0; iblock < nblocks; iblock++) int nv = mr->multivalue_block(iblock,&multivalue,&valuebytes); for (int i = 0; i < nv; i++) process each value within the block of values
The call to multivalue_blocks() returns both the total number of values (as an unsigned 64-bit integer), and the number of blocks of values in the multi-value. Each call to multivalue_block() retrieves one block of values. The number of values in the block is returned, as nv in this case. The multivalue and valuebytes arguments are pointers to a char * and int * (i.e. a char ** and int **), which will be set to point to the block of values and their lengths respectively, so they can then be used just as the multivalue and valuebytes arguments in the myreduce() callback itself (when the values do not exceed available memory).
The call to multivalue_blocks() returns the number of blocks of values in the multi-value. Each call to multivalue_block() retrieves one block of values. The number of values in the block (nv in this case) is returned. The multivalue and valuebytes arguments are pointers to a char * and int * (i.e. a char ** and int **), which will be set to point to the block of values and their lengths respectively, so they can then be used just as the multivalue and valuebytes arguments in the mycompress() callback itself (when the values do not exceed available memory).
Note that in this example we are re-using (and thus overwriting) the original multivalue and valuebytes arguments as local variables.
Also note that your mycompress() function can call multivalue_block() as many times as it wishes and process the blocks of values multiple times or in any order, though looping through blocks in ascending order will typically give the best disk I/O performance.
Your mycompress() function should typicaly produce a single key/value pair which it registers with the MapReduce object by calling the add() method of the KeyValue object. The syntax for this call is described on the doc page for the KeyValue add() methd. For example, if the set of nvalues were integers, the compressed value might be the sum of those integers.
See the Settings and Technical Details sections for details on the byte-alignment of keys and values that are passed to your mycompress() function and on those you register with the KeyValue add() methods. Note that only the first value of a multi-value (or of each block of values) passed to your mycompress() function will be aligned to the valuealign setting.
This method is an on-processor operation, requiring no communication. When run in parallel, each processor operates only on the key/value pairs it stores. Thus you are NOT compressing all values associated with a particular key across all processors, but only those currently owned by one processor.
Related methods: collate()