reosoftproductions.com
RODNEY AND ARLYN'S WEB SITE
Pig

Pig Recipes

Recipe 006

Navigation

Goal

The input file contains attribute names in the second column. Use the Pig GROUP statement to collect together records with the same attribute name and place them in bags.
Source File: /home/hduser/data/KeyValuePair.txt
Target File: /user/hduser/data/Recipe006.out/
Parameter Substitution - None

After execution, the output directory should look like this:

hduser> ls -la
total 91000
drwxrwxr-x. 2 hduser hduser       84 Apr 26 07:15 .
drwxrwxr-x. 3 hduser hduser       49 Apr 26 07:15 ..
-rw-r--r--. 1 hduser hduser 92451650 Apr 26 07:15 part-r-00000
-rw-rw-r--. 1 hduser hduser   722288 Apr 26 07:15 .part-r-00000.crc
-rw-r--r--. 1 hduser hduser        0 Apr 26 07:15 _SUCCESS
-rw-rw-r--. 1 hduser hduser        8 Apr 26 07:15 ._SUCCESS.crc

Discussion

The group statement collects together records with the same key. It is important to understand that the grouping operator in Pig Latin is fundamentally different than the one in SQL. In SQL, the GROUP BY clause creates a group that must feed directly into one or more aggregate functions. In Pig Latin, there is no direct connection between group and aggregate functions. Instead, group does exactly what it says: collects all records with the same value for the provided key together into a bag. You can then pass this to an aggregate function if you want or do other things with it:

-- count.pig
daily = load 'NYSE_daily' as (exchange, stock);
grpd  = group daily by stock;
cnt   = foreach grpd generate group, COUNT(daily);

That example groups records by the key stock and then counts them. It is just as legitimate to group them and then store them for processing at a later time:

-- group.pig
daily = load 'NYSE_daily' as (exchange, stock);
grpd  = group daily by stock;
store grpd into 'by_group';

The records coming out of the group statement have two fields, the key and the bag of collected records. The key field is named group. The bag is named for the alias that was grouped, so in the previous examples it will be named daily and have the same schema as the relation daily. If the relation daily has no schema, the bag daily will have no schema. For each record in the group, the entire record (including the key) is in the bag. Changing the last line of the previous script from store grpd... to describe grpd; will produce:

grpd: {group: bytearray,daily: {exchange: bytearray,stock: bytearray}}

You can also group on multiple keys, but the keys must be surrounded by parentheses. The resulting records still have two fields. In this case, the group field is a tuple with a field for each key:

--twokey.pig
daily = load 'NYSE_daily' as (exchange, stock, date, dividends);
grpd  = group daily by (exchange, stock);
avg   = foreach grpd generate group, AVG(daily.dividends);
describe grpd;
grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray,
    stock: bytearray,date: bytearray,dividends: bytearray}}

You can also use all to group together all of the records in your pipeline:

--countall.pig
daily = load 'NYSE_daily' as (exchange, stock);
grpd  = group daily all;
cnt   = foreach grpd generate COUNT(daily);

The record coming out of group all has the chararray literal all as a key. Usually this does not matter because you will pass the bag directly to an aggregate function such as COUNT. But if you plan to store the record or use it for another purpose, you might want to project out the artificial key first.

group usually will force a reduce phase. Grouping means collecting all records where the key has the same value. If the pipeline is in a map phase, this will force it to shuffle and then reduce. If the pipeline is already in a reduce, this will force it to pass through map, shuffle, and reduce phases.

Because grouping collects all records together with the same value for the key, you often get skewed results. That is, just because you have specified that your job have 100 reducers, there is no reason to expect that the number of values per key will be distributed evenly. They might have a Gaussian or power law distribution. For example, suppose you have an index of web pages and you group by the base URL. Certain values such as yahoo.com are going to have far more entries than most, which means that some reducers get far more data than others. Because your MapReduce job is not finished (and any subsequent ones cannot start) until all your reducers have finished, this skew will significantly slow your processing. In some cases it will also be impossible for one reducer to manage that much data.

Pig has a number of ways that it tries to manage this skew to balance out the load across your reducers. The one that applies to grouping is Hadoop's combiner. This does not remove all skew, but it places a bound on it. And because for most jobs the number of mappers will be at most in the tens of thousands, even if the reducers get a skewed number of records, the absolute number of records per reducer will be small enough that the reducers can handle them quickly.

Unfortunately, not all calculations can be done using the combiner. Calculations that can be decomposed into any number of steps, such as sum, are called distributive. These fit nicely into the combiner. Calculations that can be decomposed into an initial step, any number of intermediate steps, and a final step are referred to as algebraic. Count is an example of such a function, where the initial step is a count and the intermediate and final steps are sums. Distributive is a special case of algebraic, where the initial, intermediate, and final steps are all the same. Session analysis, where you want to track a user's actions on a website, is an example of a calculation that is not algebraic. You must have all the records sorted by timestamp before you can start analyzing their interaction with the site.

Pig's operators and built-in UDFs use the combiner whenever possible, because of its skew-reducing features and because early aggregation greatly reduces the amount of data shipped over the network and written to disk, thus speeding performance significantly. UDFs can indicate when they can work with the combiner by implementing the Algebraic interface. For information on how to make your UDFs use the combiner, see "Algebraic Interface".

For information on how to determine the level of parallelism when executing your group operation, see "Parallel". Also, keep in mind that when using group all, you are necessarily serializing your pipeline. That is, this step and any step after it until you split out the single bag now containing all of your records will not be done in parallel.

Finally, group handles nulls in the same way that SQL handles them: by collecting all records with a null key into the same group. Note that this is in direct contradiction to the way expressions handle nulls (remember that neither null == null nor null != null are true) and to the way join (see "Join") handles nulls.

Solution

The following is the script file.

/*****************************************************************************/
/* Recipe006.pig                                                             */
/*                                                                           */
/* Purpose:                                                                  */
/* The input file contains attribute names in the second column.  Use the    */
/* Pig statement to collect together records with the same attribute name    */
/* and place them in bags.                                                   */
/*                                                                           */
/* Parameter Substitution - None                                             */
/*                                                                           */
/* Pig Execution Mode:  local                                                */
/* Pig Batch Execution:                                                      */
/*   pig -x local Recipe006.pig                                              */
/*                                                                           */
/* The target directory must not exist prior to executing this script.  Use  */
/* this command to safely delete the target directory:                       */
/*   rm -rf /home/hduser/data/Recipe006.out                                  */
/*                                                                           */
/*****************************************************************************/
/* Date     Initials Description                                             */
/* -------- -------- ------------------------------------------------------- */
/* 20160526 Reo      Initial.                                                */
/*****************************************************************************/

/*****************************************************************************/
/* The source file contains fields where all of the values are enclosed in   */
/* double quotes.  In some cases, there are commas (,) within the double     */
/* quotes.  If the Load is used with PigStorage (,), the data will be parsed */
/* incorrectly.  Therefore, the CSVExcelStorage method will be used to       */
/* ensure good parsing.  CSVExcelStorage is in the PiggyBank, so it must be  */
/* REGISTERED.                                                               */
/*****************************************************************************/
REGISTER '/usr/local/pig/contrib/piggybank/java/piggybank.jar';

/*****************************************************************************/
/* Set up an alias to the java package.                                      */
/*****************************************************************************/
DEFINE CSVExcelStorage org.apache.pig.piggybank.storage.CSVExcelStorage();

/*****************************************************************************/
/* Read in the data using a comma (,) as the delimiter.                      */
/*****************************************************************************/
DVDData = LOAD '/home/hduser/data/KeyValuePair.txt'
  USING CSVExcelStorage(',')
  AS
  (
    DVDName:chararray,
    AttributeName:chararray,
    AttributeValue:chararray
  );

B = GROUP DVDData BY AttributeName;

/*****************************************************************************/
/* Time to STORE the data that was just read in.                             */
/*****************************************************************************/
STORE B INTO '/home/hduser/data/Recipe006.out';