MR-MPI WWW Site -MR-MPI Documentation - OINK Documentation - OINK Commands

4. Adding Commands to OINK

The purpose of this section is to give details of how to write new named commands that can be added to OINK and which will be invokable by your input scripts and will interact appropriately with other OINK commands.

OINK is designed to make this easy to do with a minimum of special coding on your part. Several such named commands are included with the OINK distribution; more will be added over time. See this section of the manual for a list of the current named commands in OINK. We also invite OINK users to send email to the developers with new commands they have written and wish to share, so we can add them to the distribution, attributed to you.

4.1 Source files for the new class
4.2 Methods in the new class
4.3 Calls to the OINK object manager
4.4 Calling back to map() and reduce() functions


4.1 Source files for the new class

In OINK a named command is a child class that derives from the Command parent class (see src/command.cpp and src/command.h), meaning that it contains several methods that can be called by the OINK framework. Adding a new named command to OINK is as simple as writing the code for it in two new files (e.g. foo.cpp and foo.h), dropping them into the src directory, and re-building OINK.

It is easiest to understand the description that follows if you look at an example named command in OINK. In what follows we will use the degree command, contained in src/degree.cpp and src/degree.h for illustration purposes.

The *.h file for a new named command should have lines like these at the top (from the src/degree.h file):

#ifdef COMMAND_CLASS
CommandStyle(degree,Degree)
#else 

CommandStyle(arg1,arg2) is a macro that gets converted by the OINK build procedure into source code. Arg1 is the "name" of the named command, which is how you reference it in your input script, e.g. as

degree -i graphdir -o out/outfile NULL 

Arg2 is the class that implements that command.

The list of all such named commands will appear in the style_command.h file after OINK is (re)built, via a make command.

The remainder of the *.h file (between the #else and final pair of #endif) is the definition of your new class. Note that you will need to include the mapreduce.h file (from the MR-MPI library src dir) and the MAPREDUCE_NS namespace if your class definition includes any map(), reduce(), etc callback functions since they have the "KeyValue" class name in their prototype.



4.2 Methods in the new class

When a line in the input script starts with a named command, the associated class is instantiated, its params() and run() methods are called, and the instance of the class is destroyed.

The constructor of your new class should set two variables, ninputs and noutputs, which are the number of input and output descriptors it requires you to list in the input script. For the degree command, ninputs = noutpus = 1, as illustrated in the example above.

Your new class is required to define only two methods: params() and run().

Params() is passed the list of arguments following the command name, excluding the "-i" and "-o" arguments, which are processed separately by the parent Command class. Your params() method should parse and check the arguments and generate an error message if the number of arguments is incorrect or any of their values is invalid. Note that the degree command takes 0 arguments.

Run() is called to invoke the meat of your command and it can perform any series of MapReduce or other operations you wish, using as many MapReduce objects (from the MR-MPI library) and MR-MPI objects (managed by OINK) as you wish. The calls that run() can make to the OINK object manager (obj) are discussed in the next section.

The desstructor of your new class should free any memory it has allocated, including any local MapReduce objects that it allocated. Note that this is different from MR-MPI objects and the underlying MapReduce objects they wrap, and which are often associated with the input and output descriptors of your command in the input script; those objects are created/destroyed by OINK itself, as discussed in the next section.



4.3 Calls to the OINK object manager

These are the calls that the run() method of your new class can make to the OINK object manager. Each is discussed below. Note that you make the calls via the "obj" pointer which is visible to your class, e.g. obj->cleanup(). This means you should add the line

#include "object.h" 

at the top of your *.cpp file.

MapReduce *Object::create_mr();
MapReduce *Object::create_mr(int verbosity, int timer,
			     int memsize, int outofcore);
MapReduce *Object::copy_mr(MapReduce *mr);
int Object::permanent(MapReduce *mr); 
MapReduce *Object::input(int index);
MapReduce *Object::input(int index,
			 void (*map1)(int, char *, KeyValue *, void *),
			 void *ptr);
MapReduce *Object::input(int index,
			 void (*map2)(int, char *, int, KeyValue *, void *),
			 void *ptr);
MapReduce *Object::input(int index,
			 void (*map1)(int, char *, KeyValue *, void *),
			 void (*map2)(int, char *, int, KeyValue *, void *),
			 void *ptr); 
void Object::output(int index, MapReduce *mr);
void Object::output(int index, MapReduce *mr,
		    void (*scankv)(char *, int, char *, int, void *), 
		    void *ptr, int disallow);
void Object::output(int index, MapReduce *mr,
		    void (*scankmv)(char *, int, char *, int, int *, void *), 
		    void *ptr, int disallow);
void Object::output(int index, MapReduce *mr,
		    void (*map)(uint64_t, char *, int, char *, int, 
				KeyValue *, void *), 
		    void *ptr, int disallow);
void Object::output(int index, MapReduce *mr,
		    void (*reduce)(char *, int, char *, int, int *, 
				   KeyValue *, void *), 
		    void *ptr, int disallow);
void Object::output(int index, MapReduce *mr,
		    void (*scankv)(char *, int, char *, int, void *), 
		    void (*scankmv)(char *, int, char *, int, int *, void *),
		    void (*map)(uint64_t, char *, int, char *, int, 
				KeyValue *, void *), 
		    void (*reduce)(char *, int, char *, int, int *,
				   KeyValue *, void *),
		    void *ptr, int disallow); 
void Object::cleanup(); 

Here is a brief summary of the calls your run() method will typically make:

The details are discussed below.


Your run() method should call one of the 4 variants of the input() methods one time for each of its inputs. Which variant it calls depends on what forms of input you wish to support, which is related to the "-i" arguments specified with your named command in the input script and additional options set by the input command.

Each call takes an "index" argument which is the index of the input descriptor being referenced, from 1 to Ninputs. Each call returns a pointer to a MapReduce object which will contain the desired input data as key/value (KV) pairs. As the named command doc page explains, each input descriptor for your command can be specified in the input script as one or more files or directories or as an existing MR-MPI object. For reading files, there are 2 kinds of map() methods that can be used to convert the file contents into KV pairs, one where a filename is passed to your callback function, and the other where a chunk of bytes is passed to your callback function. See the map() method doc page for details.

If you invoke this method:

MapReduce *Object::input(int index); 

then the input descriptor must be specified in your input script as an existing MR-MPI object. No reading of files is allowed.

If you invoke this method:

MapReduce *Object::input(int index,
			 void (*map1)(int, char *, KeyValue *, void *),
			 void *ptr); 

then the input can be specified as either an MR-MPI object or as files which will be processed via the map1() callback function which receives a filename as an argument, so that it can open the file, read it, and generate KV pairs.

If you invoke this method:

MapReduce *Object::input(int index,
			 void (*map2)(int, char *, int, KeyValue *, void *),
			 void *ptr); 

then the input can be specied as either an MR-MPI object or as files which will be processed via the map2() callback function which receives a chunk of bytes read from a file as an arbument, so that it can convert the byte string into KV pairs. To use this map2() method, you would also need to specify an input command in your input script that setup various options needed to call the MR-MPI library map() method that uses map2() as a callback function.

If you invoke the 4th variant:

MapReduce *Object::input(int index,
			 void (*map1)(int, char *, KeyValue *, void *),
			 void (*map2)(int, char *, int, KeyValue *, void *),
			 void *ptr); 

then both kinds of map() callback functions can be specified, map1() and map2(), and OINK will select which to use depending on what options have been setup via the input command for this input descriptor.

Note that you have to provide the map1() and/or map2() callback functions to the input() calls, with the correct prototype. As discussed below and on this doc page, they can be static methods in your class, or they can be map() methods in separate files in the OINK src directory, which are named map_*.cpp.

Also note that if you want to provide maximum flexibility for using your named command, then you should provide one of both flavors of callback map() functions for allowing input from files along with input from an existing MR-MPI object. If you do not provide either callback or just one of the two, then input scripts will be limited in what forms of input descriptor they can define.


Your run() method should call one of the 6 variants of the output() methods one time for each of its outputs. Which variant it calls depends on what forms of output you wish to support, which is related to the "-o" arguments specified with your named command in the input script and additional options set by the output command.

Each call takes an "index" argument which is the index of the output descriptor being referenced, from 1 to Noutputs. Each call also takes a MapReduce object pointer "mr", which contains the data you wish to output. As the named command doc page explains, each output descriptor for your command is specified in the input script with 2 parts, either of which can be NULL. The first part is a filename for writing output to files. The second part is the ID of an MR-MPI object which will contain the output. For writing files, there are 4 kinds of callback methods that can be used to write the contents of "mr" to a file. Each of these 4 methods is called with a "FILE *" as its final "void *" argument. This is the file pointer to a file created and opened (and later closed) by OINK which the callback method can write its data to. If you pass your own non-NULL pointer to the callback method via the "void *ptr" argument to the output() calls, then it will be appended to the FILE *, so that it can be dereferenced as a 2nd pointer passed to the callback function.

If you invoke this method:

void Object::output(int index, MapReduce *mr); 

then the output descriptor must be specified in your input script as only defining a MR-MPI object for output. No writing to files is allowed. This call will assign the ID specified in your input script to the MR-MPI object that wraps "mr". Also note, that this will remove the ID from any other MR-MPI object that has the same ID. They then become unnamed or temporary MR-MPI objects which will be deleted at the end of your run() method. See further discussion of temporaray versus permanent MR-MPI objects in the next section.

If you invoke one of these 2 methods:

void Object::output(int index, MapReduce *mr,
		    void (*scankv)(char *, int, char *, int, void *), 
		    void *ptr, int disallow);
void Object::output(int index, MapReduce *mr,
		    void (*scankmv)(char *, int, char *, int, int *, void *), 
		    void *ptr, int disallow); 

then the output can be specified as either an MR-MPI object or as files which will be written to via the scankv() or scankmv() callback functions respectively. In the first case, the scankv() function will receive key/value (KV) pairs, one at time from the "mr" MapReduce object. In the second case, the scankmv() function will receive key/multivalue (KMV) pairs, one at time from the "mr" MapReduce object. The MapReduce object will be unaltered by this operation. See the scan() method doc page in the MR-MPI library for details. The "disallow" flag is explained below.

If you invoke one of these 2 methods:

void Object::output(int index, MapReduce *mr,
		    void (*map)(uint64_t, char *, int, char *, int, 
				KeyValue *, void *), 
		    void *ptr, int disallow);
void Object::output(int index, MapReduce *mr,
		    void (*reduce)(char *, int, char *, int, int *, 
				   KeyValue *, void *), 
		    void *ptr, int disallow); 

then the output can be specified as either an MR-MPI object or as files which will be written to via the map() or reduce() callback functions respectively. In the first case, the map() function will receive key/value (KV) pairs, one at time from the "mr" MapReduce object. In the second case, the reduce() function will receive key/multivalue (KMV) pairs, one at time from the "mr" MapReduce object. For the first case, the MapReduce object will typically be unaltered by this operation, since the MR-MPI library map() method is called with addflag=1, so that the existing KV pairs are preserved. But your map() callback function should not emit any new KV pairs. For the second case, the MapReduce object will be altered by this operation, since the MR-MPI library reduce() method deletes the KMV pairs and replaces them with new KV pairs which your reduce() callback function may or may not emit. The "disallow" flag is explained below.

If you invoke the 6th variant:

void Object::output(int index, MapReduce *mr,
		    void (*scankv)(char *, int, char *, int, void *), 
		    void (*scankmv)(char *, int, char *, int, int *, void *),
		    void (*map)(uint64_t, char *, int, char *, int, 
				KeyValue *, void *), 
		    void (*reduce)(char *, int, char *, int, int *,
				   KeyValue *, void *),
		    void *ptr, int disallow); 

then any of the 4 kinds of callback functions can be specified, namely scankv(), scankmv(), map(), or reduce(). Those that you do not wish to provide or that are not compatible with the current state of the MapReduce object "mr" (which will contain either kV or KMV pairs, but not both), can be specified as NULL.

Note that you have to provide each of these 4 callback functions to the output() calls, with the correct prototype. As discussed below and on this doc page, they can be static methods in your class, or they can be methods in separate files in the OINK src directory, which are named scan_*.cpp, map_*.cpp, and reduce_*.cpp respectively.

Also note that if you want to provide maximum flexibility for using your named command, then you should provide at least one flavor of a callback function for allowing output to files along with output to an MR-MPI object. If you do not do this, then input scripts will be limited in what forms of output descriptor they can define.

All but the first of the output() variants can be called with an optional disallow flag which is set to 0 by default. If these methods are called with disallow=1, then no output to an MR-MPI object is allowed. This is useful if you expect the run() method of your named command to subsequently change the data stored in the MapReduce object, and thus make the data written to an output file differ from what is stored in the MapReduce object.


Your run() method may need to use additional MapReduce objects as workspace, in addition to its inputs. Some of these may end up holding the data you wish to output.

One key point to understand is that the OINK object manager keeps track of two kinds of MR-MPI objects, each of which is a thin wrapper on MapReduce objects which hold your key/value (KV) or key/multivalue (KMV) data. Each MR-MPI object can be "permanent" meaning it has an ID which can be referenced by input script commands. Or it can be "temporary", meaning it has no ID and was created to hold data input from a file or by the function calls discussed below. Permanent MR-MPI objects persist until they are explicitly deleted by your input script. Temporary MR-MPI objects are deleted at the end of your run() method; they can be thought of as workspace created and used by your run() method.

These two calls create a new temporary MR-MPI object and return a pointer to the MapReduce object contained within it:

MapReduce *Object::create_mr();
MapReduce *Object::create_mr(int verbosity, int timer,
			     int memsize, int outofcore); 

The first variant will use the default settings for the MapReduce object; see the set command and the settings doc page of the MR-MPI library for details. The second variant allows you to override a few of the settings with specified values.

This call makes a copy of the "mr" MapReduce object, wraps it in a new temporary MR-MPI object, and returns a pointer to the new MapReduce object:

MapReduce *Object::copy_mr(MapReduce *mr); 

There are two reasons to create new and copied MapReduce objects via these calls, rather than direcly invoking MR-MPI library calls within your run() method. I.e. two reasons to do one of these:

MapReduce *mr = obj->create_mr();
MapReduce *mr2 = obj->copy(mr); 

instead of one of these:

MapReduce *mr = new MapReduce();
MapReduce *mr2 = mr->copy(); 

The first reason is that OINK will manage the memory associated with the new MR-MPI objects and free them for you at the end of your run() method; see the cleanup() method discussion below. The second is that you can assign an ID to these temporary MR-MPI objects via the output() calls discussed above, which you cannot do if you create the MapReduce object directly yourself. I.e. you cannot pass to an output() method a pointer to a MapReduce object that you allocated yourself if that operation will assign an ID (specified in your input script) to the MR-MPI object.

You are of course free to create additional MapReduce objects yourself via direct calls to the MR-MPI library. In this case you should insure you free the objects yourself before the run() method ends, so as not to leak memory.

One additional point is that it is fine to do this within your run() method, where mr is a pointer returned by obj->create_mr():

mr = obj->copy(mr); 

whereas you should not do this:

mr = mr->copy(); 

The former simply overwrites the local mr pointer, but OINK will manage and free the memory if necessary for the underlying MapReduce objects associated with both the original and new mr pointers. The latter will leak memory since the underlying MapReduce object associated with the original mr pointer is lost.

This call is useful for checking whether a MR-MPI object has been assigned a name or not, when it was used for input or output:

int Object::permanent(MapReduce *mr); 

It is called using a MapReduce object pointer and returns a 1 if the associated MR-MPI object that wraps it has a name, and a 0 if it does not. There are two uses for this call.

First, it can be used after an input() call to determine whether the input was done from a file or an existing MR-MPI object. In the former case permanent() will return 0, since the new MR-MPI object holding the data is unnamed. In the latter case it will return 1, since the MR-MPI object holding the data was named in your input script as one of the "-i" arguments to the named command. If the run() method will subsequently alter the MapReduce object and it is permanent, you can make a copy of it, so as to not alter the original.

Second, it can be used after an output() call to determine whether the MapReduce object was assigned a name. This will be the case if a MR-MPI ID was specified in your input script as one of the "-o" arguments to the named command. If this is the case, you typically do not want to alter the data in the MapReduce object after outputting it. If you wish to further process the data, you can make a copy.

Finally, this method should be called at the end of your run() method to free all the temporary MR-MPI objects stored by OINK, and perform other internal cleanup:

void Object::cleanup(); 

4.4 Calling back to map() and reduce() functions

You run() method will typically invoke various methods from the MR-MPI library which involve callback functions, e.g. for performing map() or reduce() operations.

The MR-MPI library manual discusses the general rules for passing a pointer to a callback function to a MR-MPI library method. Since you will be doing this from within the class that encodes your named command you have two choices.

First, you can pass a pointer to a static function declared within your class. This function cannot directly access any class variables, but you can pass it the "this" pointer for the class (as the void * argument to the map() or reduce() function) which the callback function can use to access class variables indirectly, through that pointer. If you do this, then the map() and reduce() methods defined in your class can only be used by that named command.

An alternative is to put your callback functions in their own files, named map_*.cpp for map() functions, reduce_*.cpp for reduce() functions, compare_*.cpp for compare() functions, hash_*.cpp for hash() functions, and scan_*.cpp functions. By doing this the callback functions can be used by any named command or as arguments to the "MR-MPI library commands" used in an input script to invoke the MR-MPI library methods directly. See the oink/rmat.cpp file, which implements the rmat command, for an example of a named command which accesses several of its callback functions in this manner.

Each map_*.cpp file (and reduce_*.cpp, compare_*.cpp, etc) can contain one or more map() (reduce(), compare(), etc) callback functions. These are not class methods, but stand-alone functions. See examples in the oink directory. The header files that contain the prototypes for these functions are named style_map.h, style_reduce.h, etc and are auto-generated when OINK is built. Your named command class, e.g. rmat.cpp, simply needs to include these style header files in order to use any of the callback functions in OINK. Likewise, any callback function included in one of these files can be accessed by name in your input script when using one of the MR-MPI library commands. Documentation for the collection of map(), reduce(), etc functions is also auto-extracted and included in this section of the OINK documentation. Instructions on how to pass generic pointers to the callback functions is also discussed in this section.

It is also possible in the run() method of your named command to select a callback function based on an input script parameter to your command. For example, the input script could list the name of a particular compare() function you wish to you to use to sort the data in a MapReduce object. By calling the appropriate lookup() method in the MRMPI class (oink/mrmpi.cpp), the parameter string can be converted into a matching function pointer. For example, consider these lines of code:

MapReduce *mr = obj->create_mr();
...
CompareFnPtr compare = compare_lookup(userparam);
mr->sort_keys(compare); 

In this example "userparam" is a string, listed in the input script as a command parameter, which contains a function name, e.g. mySpecialCompare. Assuming that function is included in OINK in a compare_*.cpp file, the the compare_lookup() method will be able to match the string to the function and return a pointer to the function which can then be used as an argument to the sort_keys() MR-MPI library method.

The definition of CompareFnPtr and all other callback function pointers is in the "typedefs.h" file, which can be included at the top of your named commmand *.cpp file.