Pig Recipes
Navigation
- Apache Pig Website
- Wiki
- Cheat Sheets
- Pig's Data Model
- My Pig Installation
- My Pig Logging
- My Pig SET Keys
- My Pig Recipes
- My Pig UDF
- Piggybank!!
- Pig's Parameter Substitution
- Hadoop and Pig
- Programming Pig (O Reilly)
Goal
The input file contains attribute names in the second column. Use the Pig GROUP
statement to collect together records with the same attribute name and place them in bags.
Source File: /home/hduser/data/KeyValuePair.txt
Target File: /user/hduser/data/Recipe006.out/
Parameter Substitution - None
After execution, the output directory should look like this:
hduser> ls -la total 91000 drwxrwxr-x. 2 hduser hduser 84 Apr 26 07:15 . drwxrwxr-x. 3 hduser hduser 49 Apr 26 07:15 .. -rw-r--r--. 1 hduser hduser 92451650 Apr 26 07:15 part-r-00000 -rw-rw-r--. 1 hduser hduser 722288 Apr 26 07:15 .part-r-00000.crc -rw-r--r--. 1 hduser hduser 0 Apr 26 07:15 _SUCCESS -rw-rw-r--. 1 hduser hduser 8 Apr 26 07:15 ._SUCCESS.crc
Discussion
The group
statement collects together records with the same key.
It is important to understand that the grouping operator in Pig Latin is
fundamentally different than the one in SQL. In SQL, the GROUP
BY
clause creates a group that must feed directly into one or more
aggregate functions. In Pig Latin, there is no direct connection between
group
and aggregate functions. Instead, group
does
exactly what it says: collects all records with the same value for the
provided key together into a bag. You can then pass this to an aggregate
function if you want or do other things with it:
-- count.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily by stock; cnt = foreach grpd generate group, COUNT(daily);
That example groups records by the key stock
and then counts
them. It is just as legitimate to group them and then store them for
processing at a later time:
-- group.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily by stock; store grpd into 'by_group';
The records coming out of the group
statement have two fields,
the key and the bag of collected records. The key field is named
group
. The bag is named for the alias that was grouped, so in
the previous examples it will be named daily
and have the same
schema as the relation daily. If the relation daily
has no
schema, the bag daily
will have no schema. For each record in
the group, the entire record (including the key) is in the bag. Changing the
last line of the previous script from store grpd
... to
describe grpd;
will produce:
grpd: {group: bytearray,daily: {exchange: bytearray,stock: bytearray}}
You can also group on multiple keys, but the keys must be surrounded by parentheses. The resulting records still have two fields. In this case, the group field is a tuple with a field for each key:
--twokey.pig daily = load 'NYSE_daily' as (exchange, stock, date, dividends); grpd = group daily by (exchange, stock); avg = foreach grpd generate group, AVG(daily.dividends); describe grpd; grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray, stock: bytearray,date: bytearray,dividends: bytearray}}
You can also use all
to group together all of the records in your pipeline:
--countall.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily all; cnt = foreach grpd generate COUNT(daily);
The record coming out of group
all has the chararray literal
all
as a key. Usually this does not matter because you will
pass the bag directly to an aggregate function such as COUNT
.
But if you plan to store the record or use it for another purpose, you might
want to project out the artificial key first.
group
usually will force a reduce phase.
Grouping means collecting all records where the key has the same value. If
the pipeline is in a map phase, this will force it to shuffle and then
reduce. If the pipeline is already in a reduce, this will force it to pass through map, shuffle, and reduce phases.
Because grouping collects all records together with the same value
for the key, you often get skewed results. That is, just because you have
specified that your job have 100 reducers, there is no reason to expect that
the number of values per key will be distributed evenly. They might have a Gaussian or power law distribution. For example, suppose you have an index of web pages and you group by the base URL. Certain values such as yahoo.com
are going to have far more entries than most, which means that some reducers get far more data than others. Because your MapReduce job is not finished (and any subsequent ones cannot start) until all your reducers have finished, this skew will significantly slow your processing. In some cases it will also be impossible for one reducer to manage that much data.
Pig has a number of ways that it tries to manage this skew to balance out the load across your reducers. The one that applies to grouping is Hadoop's combiner. This does not remove all skew, but it places a bound on it. And because for most jobs the number of mappers will be at most in the tens of thousands, even if the reducers get a skewed number of records, the absolute number of records per reducer will be small enough that the reducers can handle them quickly.
Unfortunately, not all calculations can be done using the combiner. Calculations that can be decomposed into any number of steps, such as sum, are called distributive. These fit nicely into the combiner. Calculations that can be decomposed into an initial step, any number of intermediate steps, and a final step are referred to as algebraic. Count is an example of such a function, where the initial step is a count and the intermediate and final steps are sums. Distributive is a special case of algebraic, where the initial, intermediate, and final steps are all the same. Session analysis, where you want to track a user's actions on a website, is an example of a calculation that is not algebraic. You must have all the records sorted by timestamp before you can start analyzing their interaction with the site.
Pig's operators and built-in UDFs use the combiner whenever possible, because
of its skew-reducing features and because early aggregation greatly reduces
the amount of data shipped over the network and written to disk, thus
speeding performance significantly. UDFs can indicate when they can work with
the combiner by implementing the Algebraic
interface. For
information on how to make your UDFs use the combiner, see "Algebraic
Interface".
For information on how to determine the level of parallelism when executing your group
operation, see "Parallel". Also, keep in mind that when using group all
, you are necessarily serializing your pipeline. That is, this step and any step after it until you split out the single bag now containing all of your records will not be done in parallel.
Finally, group
handles nulls in the same way that SQL handles them: by collecting all records with a null key into the same group. Note that this is in direct contradiction to the way expressions handle nulls (remember that neither null == null
nor null != null
are true) and to the way join
(see "Join") handles nulls.
Solution
The following is the script file.
/*****************************************************************************/ /* Recipe006.pig */ /* */ /* Purpose: */ /* The input file contains attribute names in the second column. Use the */ /* Pig statement to collect together records with the same attribute name */ /* and place them in bags. */ /* */ /* Parameter Substitution - None */ /* */ /* Pig Execution Mode: local */ /* Pig Batch Execution: */ /* pig -x local Recipe006.pig */ /* */ /* The target directory must not exist prior to executing this script. Use */ /* this command to safely delete the target directory: */ /* rm -rf /home/hduser/data/Recipe006.out */ /* */ /*****************************************************************************/ /* Date Initials Description */ /* -------- -------- ------------------------------------------------------- */ /* 20160526 Reo Initial. */ /*****************************************************************************/ /*****************************************************************************/ /* The source file contains fields where all of the values are enclosed in */ /* double quotes. In some cases, there are commas (,) within the double */ /* quotes. If the Load is used with PigStorage (,), the data will be parsed */ /* incorrectly. Therefore, the CSVExcelStorage method will be used to */ /* ensure good parsing. CSVExcelStorage is in the PiggyBank, so it must be */ /* REGISTERED. */ /*****************************************************************************/ REGISTER '/usr/local/pig/contrib/piggybank/java/piggybank.jar'; /*****************************************************************************/ /* Set up an alias to the java package. */ /*****************************************************************************/ DEFINE CSVExcelStorage org.apache.pig.piggybank.storage.CSVExcelStorage(); /*****************************************************************************/ /* Read in the data using a comma (,) as the delimiter. */ /*****************************************************************************/ DVDData = LOAD '/home/hduser/data/KeyValuePair.txt' USING CSVExcelStorage(',') AS ( DVDName:chararray, AttributeName:chararray, AttributeValue:chararray ); B = GROUP DVDData BY AttributeName; /*****************************************************************************/ /* Time to STORE the data that was just read in. */ /*****************************************************************************/ STORE B INTO '/home/hduser/data/Recipe006.out';