reosoftproductions.com
RODNEY AND ARLYN'S WEB SITE
Pig

Pig's UDF

Pig's UDF

Navigation

It is time to turn our attention to how you can extend Pig. So far we have looked at the operators and functions Pig provides. But Pig also makes it easy for you to add your own processing logic via User Defined Functions (UDFs). These are written in Java and, starting with version 0.8, in Python. This chapter will walk through how you can build evaluation functions, UDFs that operate on single elements of data or collections of data. It will also cover how to write filter functions, UDFs that can be used as part of filter statements.

UDFs are powerful tools, and thus the interfaces are somewhat complex. In designing Pig, our goal was to make easy things easy and hard things possible. So, the simplest UDFs can be implemented in a single method, but you will have to implement a few more methods to take advantage of more advanced features.

Throughout this chapter we will use several running examples of UDFs. Some of these are built-in Pig UDFs, which can be found in your Pig distribution at src/org/apache/pig/builtin/. The others can be found on GitHub with the other example UDFs, in the directory udfs.

Writing an Evaluation Function in Java

Pig and Hadoop are implemented in Java, and so it is natural to implement UDFs in Java. This allows UDFs access to the Hadoop APIs and to many of Pig's facilities.

Before diving into the details, it is worth considering names. Pig locates a UDF by looking for a Java class that exactly matches the UDF name in the script. For details on where it looks, see "Registering UDFs" and "define and UDFs". There is not an accepted standard on whether UDF names should be all uppercase, camelCased (e.g., MyUdf), or all lowercase. Even the built-in UDFs provided by Pig vary in this regard. Keep in mind that, whatever you choose, you and all of the users of your UDF will have a better user experience if you make the name short, easy to remember, and easy to type.

Where Your UDF Will Run

Writing code that will run in a parallel system presents challenges. A separate instance of your UDF will be constructed and run in each map or reduce task. It is not possible to share state across these instances because they may not all be running at the same time. There will be only one instance of your UDF per map or reduce task, so you can share state within that context. Note: Assuming there is one instance of your UDF in the script. Each reference to a UDF in a script becomes a separate instance on the backend, even if they are placed in the same map or reduce task.

When writing code for a parallel system, you must remember the power of parallelism. Operations that are acceptable in serial programs may no longer be advisable. Consider a UDF that, when it first starts, connects to a database server to download a translation table. In a serial or low-parallelism environment, this is a reasonable approach. But if you have 10,000 map tasks in your job and they all connect to your database at once, you will most likely hear from your DBA, and the conversation is unlikely to be pleasant.

In addition to an instance in each task, Pig will construct an instance of your UDF on the frontend during the planning stage. It does this for a couple of reasons. One, it wants to test early that it can construct your UDF; it would rather fail during planning than at runtime. Two, as we will cover later in this chapter, it will ask your UDF some questions about schemas and types it accepts as part of the execution planning. It will also give your UDF a chance to store information it wants to make available to the instances of itself that will be run in the backend.

Evaluation Function Basics

All evaluation functions extend the Java class org.apache.pig.EvalFunc. This class uses Java generics. It is parameterized by the return type of your UDF. The core method in this class is exec. It takes one record and returns one result, which will be invoked for every record that passes through your execution pipeline. As input it takes a tuple, which contains all of the fields the script passes to your UDF. It returns the type by which you parameterized EvalFunc. For simple UDFs, this is the only method you need to implement. The following code gives an example of a UDF that raises an integer to an integral power and returns a long result:

// java/com/acme/math/Pow.java
/**
 * A simple UDF that takes a value and raises it to the power of a second
 * value.  It can be used in a Pig Latin script as Pow(x, y), where x and y
 * are both expected to be ints.
 */
public class Pow extends EvalFunc {

    public Long exec(Tuple input) throws IOException {
        try {
            /* Rather than give you explicit arguments, UDFs are always handed
             * a tuple.  The UDF must know the arguments it expects and pull
             * them out of the tuple.  These next two lines get the first and
             * second fields out of the input tuple that was handed in.  Since
             * Tuple.get returns Objects, we must cast them to Integers.  If
             * the case fails, an exception will be thrown.
             */
            int base = (Integer)input.get(0);
            int exponent = (Integer)input.get(1);
            long result = 1;

            /* Probably not the most efficient method...*/
            for (int i = 0; i < exponent; i++) {
                long preresult = result;
                result *= base;
                if (preresult > result) {
                    // We overflowed.  Give a warning, but do not throw an
                    // exception.
                    warn("Overflow!", PigWarning.TOO_LARGE_FOR_INT);
                    // Returning null will indicate to Pig that we failed but
                    // we want to continue execution.
                    return null;
                }
            }
            return result;
        } catch (Exception e) {
            // Throwing an exception will cause the task to fail.
            throw new IOException("Something bad happened!", e);
        }
    }
}

EvalFunc is also used to implement aggregation functions. Because the group operator returns a record for each group, with a bag containing all the records in that group, your eval func still takes one record and returns one record. As an example of this, let's take a look at the implementation of exec in Pig's COUNT function. Some of the error-handling code has been removed for ease of reading:

// src/org/apache/pig/builtin/COUNT.java
public Long exec(Tuple input) throws IOException {
  try {
    // The data bag is passed to the UDF as the first element of the
    // tuple.
    DataBag bag = (DataBag)input.get(0);
    Iterator it = bag.iterator();
    long cnt = 0;
    while (it.hasNext()){
      Tuple t = (Tuple)it.next();
      // Don't count nulls or empty tuples
      if (t != null && t.size() > 0 &&
          t.get(0) != null) {
        cnt++;
      }
    }
    return cnt;
  } catch (Exception e) {
      ...
  }
}

Just as UDFs can take complex types as input, they also can return complex types as output. You could, for example, create a SetIntersection UDF that took two bags as input and returned a bag as output.

UDFs can also be handed the entire record by passing * to the UDF. You might expect that in this case the input Tuple argument passed to the UDF would contain all the fields passed into the operator the UDF is in. But it does not. Instead, it contains one field, which is a tuple that contains all those fields. Consider a Pig Latin script like this:

data      = load 'input' as (x, y, z);
processed = foreach data generate myudf(*);

In this case, myudf.exec will get a tuple with one field, which will be a tuple that will have three fields: x, y, and z. To access the y field of data, you will need to call t.get(0).get(1).

Interacting with Pig values

Evaluation functions and other UDFs are exposed to the internals of how Pig represents data types. This means that when you read a field and expect it to be an integer, you need to know that it will be an instance of java.lang.Integer. For a complete list of Pig types and how they are represented in Java, see "Types". For most of these types, you construct the appropriate Java objects in the normal way. However, this is not the case for tuples and bags. These are interfaces, and they do not have direct constructors. Instead, you must use factory classes for each of these. This was done so that users and developers could build their own implementations of tuple and bag and instruct Pig to use them.

TupleFactory is an abstract singleton class that you must use to create tuples. You can also configure which TupleFactory is used, since users who provide their own tuples will need to provide their own factory to produce them. To get an instance of TupleFactory to construct tuples, call the static method TupleFactory.getInstance().

You can now create new tuples with either newTuple() or newTuple(int size). Whenever possible you should use the second method, which preallocates the tuple with the right number of fields. This avoids the need to dynamically grow the tuple later and is much more efficient. The method creates a tuple with size number of fields, all of which are null. You can now set the fields using the Tuple's set(int fieldNum, Object val) method. As an example, we can look at how the example load function we will build in the next chapter creates tuples:

// JsonLoader.java
private TupleFactory tupleFactory = TupleFactory.getInstance();

private Object readField(JsonParser p,
                         ResourceFieldSchema field,
                         int fieldnum) throws IOException {
    ...
        ResourceSchema s = field.getSchema();
        ResourceFieldSchema[] fs = s.getFields();
        Tuple t = tupleFactory.newTuple(fs.length);

        for (int j = 0; j < fs.length; j++) {
            t.set(j, readField(p, fs[j], j));
        }
    ...
}

If you do not know the number of fields in the tuple when it is constructed, you can use newTuple(). You can then add fields using Tuple's append(Object val) method, which will append the field to the end of the tuple.

To read data from tuples, use the get(int fieldNum) method. This returns a Java Object because the tuple does not have a schema instance and does not know what type this field is. You must either cast the result to the appropriate type or use the utility methods in org.apache.pig.data.DataType to determine the type.

Similar to tuples, BagFactory must be used to construct bags. You can get an instance using BagFactory.getInstance(). To get a new, empty bag, call newDefaultBag(). You can then add tuples to it as you construct them using DataBag's add(Tuple t) method. You should do this rather than constructing a list of tuples and then passing it using newDefaultBag(List listOfTuples), because bags know how to spill to disk when they grow so large that they cannot fit into memory. Again we can look at JsonLoader to see an example of constructing bags:

// JsonLoader.java
private BagFactory bagFactory = BagFactory.getInstance();

private Object readField(JsonParser p,
                         ResourceFieldSchema field,
                         int fieldnum) throws IOException {
    ...
        DataBag bag = bagFactory.newDefaultBag();

        JsonToken innerTok;
        while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) {

            t = tupleFactory.newTuple(fs.length);
            for (int j = 0; j < fs.length; j++) {
                t.set(j, readField(p, fs[j], j));
            }

            p.nextToken(); // read end of object
            bag.add(t);
        }
    ...
}

To read data from a bag, use the iterator provided by iterator(). This also implements Java's Iterable, so you can use the construct for (Tuple t : bag).

Bags make the assumption that once data is being read from them, no new data will be written to them. Their implementation of how they spill and reread data depends on this assumption. So once you call iterator, you should never call add again on the same bag.

Input and Output Schemas

Pig typechecks a script before running it. EvalFunc includes a method to allow you to turn on type checking for your UDF as well, both for input and output.

When your UDF returns a simple type, Pig uses Java reflection to determine the return type. However, because exec takes a tuple, Pig has no way to determine what input you expect your UDF to take. You can check this at runtime, of course, but your development and testing will go more smoothly if you check it at compile time instead. For example, we could use the Pow UDF example in the previous section like this:

register 'acme.jar';
A = load 'input' as (x:chararray, y :int);
B = foreach A generate y, com.acme.math.Pow(x, 2);
dump B;

Pig will start a job and run your tasks. All the tasks will fail, and you will get an error message ERROR 2078: Caught error from UDF: com.acme.math.Pow [Something bad happened!]. Runtime exceptions like this are particularly expensive in Hadoop, both because scheduling can take a while on a busy cluster and because each task is tried three times before the whole job is declared a failure. Let’s fix this UDF so it checks up front that it was given reasonable input.

The method to declare the input your UDF expects is outputSchema. The method is called this because it returns the schema that describes the UDF’s output. If your UDF does not override this method, Pig will attempt to ascertain your return type from the return type of your implementation of EvalFunc, and pass your UDF whatever input the script indicates. If your UDF does implement this method, Pig will pass it the schema of the input that the script has indicated to pass into the UDF. This is also your UDF’s opportunity to throw an error if it receives an input schema that does not match its expectations. An implementation of this method for Pow looks like this:

// java/com/acme/math/Pow.java
public Schema outputSchema(Schema input) {
    // Check that we were passed two fields
    if (input.size() != 2) {
        throw new RuntimeException(
            "Expected (int, int), input does not have 2 fields");
    }

    try {
        // Get the types for both columns and check them.  If they are
        // wrong, figure out what types were passed and give a good error
        // message.
        if (input.getField(0).type != DataType.INTEGER ||
                input.getField(1).type != DataType.INTEGER) {
            String msg = "Expected input (int, int), received schema (";
            msg += DataType.findTypeName(input.getField(0).type);
            msg += ", ";
            msg += DataType.findTypeName(input.getField(1).type);
            msg += ")";
            throw new RuntimeException(msg);
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

    // Construct our output schema, which is one field that is a long
    return new Schema(new FieldSchema(null, DataType.LONG));
}

With this method added to Pow, when we invoke the previous script that mistakenly tries to pass a chararray to Pow, it now fails almost immediately with java.lang.RuntimeException: Expected input of (int, int), but received schema (chararray, int).

Pig’s Schema is a complicated class, and we will not delve into all its complexities here. The following summary will be enough to help you build your own schemas for outputSchema. At its core, Schema is a list of FieldSchemas and a mapping of aliases to FieldSchemas. Each FieldSchema contains an alias and a type. The types are stored as Java bytes, with constants for each type defined in the class org.apache.pig.data.DataType. Schema is a recursive structure. Each FieldSchema also has a Schema member. This member is nonnull only when the type is complex. In the case of tuples, it defines the schema of the tuple. In the case of bags, it defines the schema of the tuples in the bag. Starting in 0.9, if a schema is present for a map, it indicates the data type of values in the map. Before 0.9, maps did not have schemas:

public class Schema implements Serializable, Cloneable {

    // List of all fields in the schema.
    private List mFields;

    // Map of alias names to field schemas, so that lookup can be done by alias.
    private Map mAliases;

    // A FieldSchema represents a schema for one field.
    public static class FieldSchema implements Serializable, Cloneable {

         // Alias for this field.
        public String alias;

         // Datatype, using codes from org.apache.pig.data.DataType.
        public byte type;

        // If this is a tuple itself, it can have a schema. Otherwise, this field
        // must be null.
        public Schema schema;

        /**
         * Constructor for any type.
         * @param a Alias, if known. If unknown, leave null.
         * @param t Type, using codes from org.apache.pig.data.DataType.
         */
        public FieldSchema(String a, byte t) { ... }
    }

    /**
     * Create a schema with more than one field.
     * @param fields List of field schemas that describes the fields.
     */
    public Schema(List fields) { ... }

    /**
     * Create a schema with only one field.
     * @param fieldSchema field to put in this schema.
     */
    public Schema(FieldSchema fieldSchema) { ... }
 
    /**
     * Given an alias name, find the associated FieldSchema.
     * @param alias Alias to look up.
     * @return FieldSchema, or null if no such alias is in this tuple.
     */
    public FieldSchema getField(String alias) throws FrontendException { 
        // some error checking omitted.
        return mAliases.get(alias);
    }

    /**
     * Given a field number, find the associated FieldSchema.
     *
     * @param fieldNum Field number to look up.
     * @return FieldSchema for this field.
     */
    public FieldSchema getField(int fieldNum) throws FrontendException {
        // some error checking omitted.
        return mFields.get(fieldNum);
    }
}

As mentioned earlier, when your UDF returns a scalar type, Pig can use reflection to figure out that return type. When your UDF returns a bag or a tuple, however, you will need to implement outputSchema if you want Pig to understand the contents of that bag or tuple.

Error Handling and Progress Reporting

Our previous examples have given some hints of how to deal with errors. When your UDF encounters an error, you have a couple of choices on how to handle it. The most common case is to issue a warning and return a null. This tells Pig that your UDF failed and its output should be viewed as unknown.[26] We saw an example of this when the Pow function detected overflow:

for (int i = 0; i < exponent; i++) {
    long preresult = result;
    result *= base;
    if (preresult > result) {
        // We overflowed.  Give a warning, but do not throw an
        // exception.
        warn("Overflow!", PigWarning.TOO_LARGE_FOR_INT);
        // Returning null will indicate to Pig that we failed but
        // we want to continue execution.
        return null;
    }
}

warn, a method of EvalFunc, takes a message that you provide as well as a warning code. The warning codes are in org.apache.pig.PigWarning, including several user-defined codes that you can use if none of the provided codes matches your situation. These warnings are aggregated by Pig and reported to the user at the end of the job.

Warning and returning null is convenient because it allows your job to continue. When you are processing billions of records, you do not want your job to fail because one record out of all those billions had a chararray where you expected an int. Given enough data, the odds are overwhelming that a few records will be bad, and most calculations will be fine if a few data points are missing.

For errors that are not tolerable, your UDF can throw an exception. If Pig catches an exception, it will assume that you are asking to stop everything, and it will cause the task to fail. Hadoop will then restart your task. If any particular task fails three times, Hadoop will not restart it again. Instead, it will kill all the other tasks and declare the job a failure.

When you have concluded that you do need an exception, you should also issue a log message so that you can read the task logs later and get more context to determine what happened. EvalFunc has a member log that is an instance of org.apache.commons.logging.Log. Hadoop prints any log messages into logfiles on the task machine, which are available from the JobTracker UI. See “MapReduce Job Status” for details. You can also print info messages into the log to help you with debugging.

In addition to error reporting, some UDFs will need to report progress. Hadoop listens to its tasks to make sure they are making progress. If it does not hear from a task for five minutes, it concludes that the task died or went into an infinite loop. It then kills the task if it is still running, cleans up its resources, and restarts the task elsewhere. Pig reports progress to Hadoop on a regular basis. However, if you have a UDF that is very compute-intensive and a single invocation of it might run for more than five minutes, you should also report progress. To do this, EvalFunc provides a member reporter. By invoking report.progress() or report.progress(String msg) (where msg can say whatever you want) at least every five minutes, your UDF will avoid being viewed as a timeout.

Constructors and Passing Data from Frontend to Backend

Our discussion so far assumes that your UDF knows everything it needs to know at development time. This is not always the case. Consider a UDF that needs to read a lookup table from HDFS. You would like to be able to declare the filename when you use the UDF. You can do that by defining a nondefault constructor for your UDF.

By default, EvalFuncs have a no-argument constructor, but you can provide a constructor that takes one or more String arguments. This alternate constructor is then referenced in Pig Latin by using the define statement to define the UDF; see “define and UDFs” for details.

As an example, we will look at a new UDF, MetroResolver. This UDF takes a city name as input and returns the name of the larger metropolitan area that city is part of. For example, given Pasadena, it will return Los Angeles. Based on which country the input cities are in, a different lookup table will be needed. The name of a file in HDFS that contains this lookup table can be provided as a constructor argument. The class declaration, members, and constructor for our UDF look like this:

// java/com/acme/marketing/MetroResolver.java
/**
 * A lookup UDF that maps cities to metropolitan areas.
 */
public class MetroResolver extends EvalFunc {

    String lookupFile;
    HashMap lookup = null;

    /*
     * @param file - File that contains a lookup table mapping cities to metro
     * areas.  The file must be located on the filesystem where this UDF will
     * run.
     */
    public MetroResolver(String file) {
        // Just store the filename. Don't load the lookup table, since we may
        // be on the frontend or the backend.
        lookupFile = file;
    }
}

The UDF can now be invoked in a Pig Latin script like this:

register 'acme.jar';
define MetroResolver com.acme.marketing.MetroResolver('/user/you/cities/us');
A = load 'input' as (city:chararray);
B = foreach A generate city, MetroResolver(city);
dump B;

The filename /user/you/cities/us will be passed to MetroResolver every time Pig constructs it. However, our UDF is not yet complete because we have not constructed the lookup table. In fact, we explicitly set it to null. It does not make sense to construct it in the constructor, because the constructor will be invoked on both the frontend and backend. There are forms of dark magic that will allow the UDF to figure out whether it is being invoked on the frontend or backend, but I cannot recommend them, because they are not guaranteed to work the same between releases. It is much better to do the lookup table construction in a method that we know will be called only in the backend.

EvalFunc does not provide an initialize method that it calls on the backend before it begins processing. You can work around this by keeping a flag to determine whether you have initialized your UDF in a given task. The exec function for MetroResolver does this by tracking whether lookup is null:

public String exec(Tuple input) throws IOException {
    if (lookup == null) {
        // We have not been initialized yet, so do it now.

        lookup = new HashMap();
        // Get an instance of the HDFS FileSystem class so
        // we can read a file from HDFS.  We need a copy of
        // our configuration to do that.
        // Read the configuration from the UDFContext.
        FileSystem fs = FileSystem.get(UDFContext.getUDFContext().getJobConf());
        DataInputStream in = fs.open(new Path(lookupFile));
        String line;
        while ((line = in.readLine()) != null) {
            String[] toks = new String[2];
            toks = line.split(":", 2);
            lookup.put(toks[0], toks[1]);
        }
        in.close();
    }
    return lookup.get((String)input.get(0));
}

This initialization section handles opening the file and reading it. In order to open the file, it must first connect to HDFS. This is accomplished by FileSystem.get. This method in turn needs a JobConf object, which is where Hadoop stores all its job information. The JobConf object can be obtained using UDFContext, which we will cover in more detail later. Note that obtaining JobConf in this way works only on the backend, as no job configuration exists on the frontend.

Once we are connected to HDFS, we open the file and read it as we would any other file. It is parsed into two fields and put into the hash table. All subsequent calls to exec will just be lookups in the hash table.

Loading the distributed cache

Our MetroResolver UDF opens and reads its lookup file from HDFS, which you will often want. However, having hundreds or thousands of map tasks open the same file on HDFS at the same time puts significant load on the NameNode and the DataNodes that host the file’s blocks. To avoid this situation, Hadoop provides the distributed cache, which allows users to preload HDFS files locally onto the nodes their tasks will run on. For details, see "Distributed Cache".

Let’s write a second version of MetroResolver that uses the distributed cache. Beginning in version 0.9, EvalFunc provides a method getCacheFiles that is called on the frontend. Your UDF returns a list of files from this method that it wants in the distributed cache. The format of each file is client_file#task_file, where client_file is the path to the file on your client, and task_file is the name the file will be given on your task node. task_file is relative to your UDF’s working directory on the backend. You should place any files in your working directory rather than using an absolute path. task_file will be a local file on the task node and should be read using standard Java file utilities. It should not be read using HDFS’s FileSystem:

// java/com/acme/marketing/MetroResolverV2.java
/**
 * A lookup UDF that maps cities to metropolatin areas, this time using the
 * Distributed Cache.
 */
public class MetroResolverV2 extends EvalFunc {

    String lookupFile;
    HashMap lookup = null;

    /*
     * @param file - File that contains a lookup table mapping cities to metro
     * areas.  The file must be located on the filesystem where this UDF will
     * run.
     */
    public MetroResolverV2(String file) {
        // Just store the filename. Don't load the lookup table, since we may
        // be on the frontend or the backend.
        lookupFile = file;
    }

    public String exec(Tuple input) throws IOException {
        if (lookup == null) {
            // We have not been initialized yet, so do it now.
            lookup = new HashMap();

            // Open the file as a local file.
            FileReader fr = new FileReader("./mrv2_lookup");
            BufferedReader d = new BufferedReader(fr);
            String line;
            while ((line = d.readLine()) != null) {
                String[] toks = new String[2];
                toks = line.split(":", 2);
                lookup.put(toks[0], toks[1]);
            }
            fr.close();
        }
        return lookup.get((String)input.get(0));
    }

    public List getCacheFiles() {
        List list = new ArrayList(1);
        // We were passed the name of the file on HDFS.  Append a
        // name for the file on the task node.
        list.add(lookupFile + "#mrv2_lookup");
        return list;
    }
}

UDFContext

Constructor arguments work as a way to pass information into your UDF, if you know the data at the time the script is written. You can extend this using parameter substitution (see "Parameter Substitution") so that data can be passed when the script is run. But some information you want to pass from frontend to backend cannot be known when the script is run, or it might not be accessible in String form on the command line. An example is collecting properties from the environment and passing them.

To allow UDFs to pass data from the frontend to the backend, starting in version 0.8, Pig provides a singleton class, UDFContext. Your UDF obtains a reference to it by calling getUDFContext. We have already seen that UDFs can use UDFContext to obtain a copy of the JobConf. Beginning in version 0.9, UDFContext also captures the System properties on the client and carries them to the backend. Your UDF can then obtain them by calling getClientSystemProperties.

UDFContext also provides mechanisms for you to pass a properties object explicitly for your UDF. You can either pass a properties object for all UDFs of the same class or pass a specific object for each instance of your UDF. To use the same one for all instances of your UDF, call getUDFProperties(this.getClass()). This will return a Properties object that is a reference to a properties object kept by UDFContext. UDFContext will capture and transmit to the backend any changes made in this object. You can call this in outputSchema, which is guaranteed to be called in the frontend. When you want to read the data, call the same method again in your exec method. When using the object in the exec method, keep in mind that any changes made to the returned Properties will not be transmitted to other instances of the UDF on the backend, unless you happen to have another instance of the same UDF in the same task. This is a mechanism for sending information from the frontend to the backend, not between instances in the backend.

Sometimes you will want to transmit different data to different instances of the same UDF. By different instances I mean different invocations in your Pig Latin script, not different instantiations in various map and reduce tasks. To support this, UDFContext provides getUDFProperties(Class, String[]). The constructor arguments to your UDF are a good candidate to be passed as the array of String. This allows each instance of the UDF to differentiate itself. If your UDF does not take constructor arguments, or all arguments have the same value, you can add one unused argument that is solely to distinguish separate instances of the UDF.

Consider a UDF that has its own properties file, which might be useful if you want to pass different properties to different UDFs, or if you have many UDF-specific properties that you want to change without changing your Pig properties file. Let’s write a second version of the stock analyzer UDF that we used in Chapter 6:

// java/com/acme/financial/AnalyzeStockV2.java
/**
 * This UDF takes a bag of information about a stock and
 * produces a floating-point score between 1 and 100, 
 * 1 being sell, 100 being buy.
 */
public class AnalyzeStockV2 extends EvalFunc {

    Random r = new Random();
    Properties myProperties = null;

    @Override
    public Float exec(Tuple input) throws IOException {
        if (myProperties == null) {
            // Retrieve our class-specific properties from UDFContext.
            myProperties =
                UDFContext.getUDFContext().getUDFProperties(this.getClass());
        }

        // Make sure the input isn't null and is of the right size.
        if (input == null || input.size() != 1) return null;

        DataBag b = (DataBag)input.get(0);
        for (Tuple t : b) {
            // Do some magic analysis, using properites from myProperties to
            // decide how ...
        }
        return r.nextFloat() * 100;
    }
    @Override
    public Schema outputSchema(Schema input) {
        try {
            // Read our properties file.
            Properties prop = new Properties();
            prop.load(new FileInputStream("/tmp/stock.properties"));
            // Get a properties object specific to this UDF class.
            UDFContext context = UDFContext.getUDFContext();
            Properties udfProp = context.getUDFProperties(this.getClass());
            // Copy our properties into it.  There is no need to pass it
            // back to UDFContext.
            for (Map.Entry e : prop.entrySet()) {
                udfProp.setProperty((String)e.getKey(), (String)e.getValue());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return new Schema(new Schema.FieldSchema(null, DataType.FLOAT));
    }

}

Overloading UDFs

Sometimes you want different UDF implementations depending on the data type the UDF is processing. For example, MIN(long) should return a long, whereas MIN(int) should return an int. To enable this, EvalFunc provides the method getArgToFuncMapping. If this method returns a null, Pig will use the current UDF. To provide a list of alternate UDFs based on the input types, this function returns a list of FuncSpecs. A FuncSpec is a Pig class that describes a UDF. Each of these FuncSpecs describes a set of expected input arguments and the UDF, as a Java class, that should be used to handle them. Pig's typechecker will use this list to determine which Java class to place in the execution pipeline (more on this later). The getArgToFuncMapping of Pig's built-in MIN function looks like this:

// src/org/apache/pig/builtin/MIN.java
public List getArgToFuncMapping()
throws FrontendException {
  List funcList = new ArrayList();

  // The first element in the list is this class itself, which is built to
  // handle the case where the input is a bytearray.  So we return our own
  // classname and a schema that indicates this function expects a BAG with 
  // tuples that have one field, which is a bytearray.  generateNestedSchema is a
  // helper method that generates schemas of bags that have tuples with one
  // field.
  funcList.add(new FuncSpec(this.getClass().getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY)));

  // If our input schema is a bag with tuples with one field that is a double,
  // then we use the class DoubleMin instead of MIN to implement min.
  funcList.add(new FuncSpec(DoubleMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));

  // and so on...
  funcList.add(new FuncSpec(FloatMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));

  funcList.add(new FuncSpec(IntMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));

  funcList.add(new FuncSpec(LongMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));

  funcList.add(new FuncSpec(StringMin.class.getName(),
      Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));

  return funcList;
}

Pig's typechecker goes through a set of steps to determine which FuncSpec is the closest match, and thus which Java class it should place in this job's execution pipeline. At each step, if it finds a match, it uses that match. If it finds more than one match at a given step, it will return an error that gives all the matching possibilities. If it finds no match in the whole list, it will also give an error. As an example of this, let's consider another version of the Pow UDF we built above. We will call this one PowV2. It takes either two longs or two doubles as input. Its getArgToFuncMapping looks like the following:

// java/com/acme/math/PowV2.java
public List getArgToFuncMapping() throws FrontendException {
    List funcList = new ArrayList();
    Schema s = new Schema();
    s.add(new Schema.FieldSchema(null, DataType.DOUBLE));
    s.add(new Schema.FieldSchema(null, DataType.DOUBLE));
    funcList.add(new FuncSpec(this.getClass().getName(), s));
    s = new Schema();
    s.add(new Schema.FieldSchema(null, DataType.LONG));
    s.add(new Schema.FieldSchema(null, DataType.LONG));
    funcList.add(new FuncSpec(LongPow.class.getName(), s));
    return funcList;
}

In the typechecker's search for the best UDF to use, step one is to look for an exact match, where all of the expected input declared by the UDF is matched by the actual input passed in Pig Latin. Pow(2.0, 3.1415) passes two doubles, so Pig Latin will choose PowV2. Pow(2L, 3L) passes two longs, so LongPow will be used.

Step two is to look for bytearrays that are passed into the UDF and see whether a match can be made by inserting casts for those bytearrays. For example, Pig will rewrite Pow(x, 2L), where x is a bytearray, as Pow((long)x, 2L) and use LongPow. This rule can confuse Pig when all arguments are bytearrays, because bytearrays can be cast to any type. Pow(x, y), where both x and y are bytearrays, results in an error message:

Multiple matching functions for com.acme.math.PowV2 with input schema: 
    ({double,double}, {long,long}). Please use an explicit cast.

Step three is to look for an implicit cast that will match one of the provided schemas. The implicit cast that is "closest" will be used. Implicit casting of numeric types goes from int to long to float to double, and by closest I mean the cast that requires the least steps in that list. So, Pow(2, 2) will use LongPow, whereas Pow(2.0, 2) will use PowV2.

Step four is to look for a working combination of steps two and three, bytearray casts plus implicit casts. Pow(x, 3.14f), where x is a bytearray, will use PowV2 by promoting 3.14f to a double and casting x to a double.

If after all these steps Pig still has not found a suitable method, it will fail and say it cannot determine which method to use. Pow('hello', 2) gives an error message:

Could not infer the matching function for com.acme.math.PowV2 as multiple or
none of them fit. Please use an explicit cast.

Memory Issues in Eval Funcs

Some operations you will perform in your UDFs will require more memory than is available. As an example, you might want to build a UDF that calculates the cumulative sum of a set of inputs. This will return a bag of values because, for each input, it needs to return the intermediate sum at that input.

Pig's bags handle spilling data to disk automatically when they pass a certain size threshold or when only a certain amount of heap space remains. Spilling to disk is expensive and should be avoided whenever possible. But if you must store large amounts of data in a bag, Pig will manage it.

Bags are the only Pig data type that know how to spill. Tuples and maps must fit into memory. Bags that are too large to fit in memory can still be referenced in a tuple or a map; this will not be counted as those tuples or maps not fitting into memory.