MapReduce-MPI WWW Site - MapReduce-MPI Documentation

MapReduce aggregate() method

uint64_t MapReduce::aggregate(int (*myhash)(char *, int)) 

This calls the aggregate() method of a MapReduce object, which reorganizes a KeyValue object across processors into a new KeyValue object. In the original object, duplicates of the same key may be stored on many processors. In the new object, all duplicates of a key are stored by the same processor. The method returns the total number of key/value pairs in the new KeyValue object, which will be the same as the number in the original object.

A hashing function is used to assign keys to processors. Typically you will not care how this is done, in which case you can specify a NULL, i.e. mr->aggregate(NULL), and the MR-MPI library will use its own internal hash function, which will distribute them randomly and hopefully evenly across processors.

On the other had, if you know the best way to do this for your data, then you should provide the hashing function. For example, if your keys are integer IDs for particles or grid cells, you might want to use the ID (modulo the processor count) to choose the processor it is assigned to. Ideally, you want a hash function that will distribute keys to processors in a load-balanced fashion.

In this example the user function is called myhash() and it must have the following interface:

int iproc = myhash(char *key, int keybytes) 

Your function will be passed a key (byte string) and its length in bytes. Typically you want to return an integer such that 0 <= iproc < P, where P is the number of processors. But you can return any integer, since the MR-MPI library uses the result in this manner to assign the key to a processor:

int iproc = myhash(key,keybytes) % P; 

Because the aggregate() method will, in general, reassign all key/value pairs to new processors, it incurs a large volume of all-to-all communication. However, this is performed concurrently, taking advantage of the large bisection bandwidth most large parallel machines provide.

The aggregate() method should load-balance key/value pairs across processors if they are initially imbalanced.


Related methods: collate()