Pig Tutorial

The Pig tutorial shows you how to run two Pig scripts in Local mode and Hadoop mode.

Local Mode: To run the scripts in local mode, no Hadoop or HDFS installation is required. All files are installed and run from your local host and file system.


Hadoop Mode: To run the scripts in hadoop (mapreduce) mode, you need access to a Hadoop cluster and HDFS installation available through Hadoop Virtual Machine provided with this tutorial.


Java Installation (Note: already set-up on the Hadoop VM.)

1.Java 1.6.x (from Sun) is installed on /usr/jre16.
2.The JAVA_HOME environment variable is set the root of your Java installation in "/home/hadoop-user/.profile" file.

Pig Installation (Note: already set-up on the Hadoop VM.)

1.The pig.jar and tutorial files are stored in "/home/hadoop-user/pig" directory.
2.The PIGDIR environment variable is set to "/home/hadoop-user/pig/"in the .profile file of hadoop-user.

Pig Scripts: Local Mode

To run the Pig scripts in local mode, do the following:

1.Go to the /home/hadoop-user/pig directory on Hadoop VM.
2.Review Pig Script 1 and Pig Script 2.
3.Execute the following command (using either script1-local.pig or script2-local.pig).

$ java -cp $PIGDIR/pig.jar org.apache.pig.Main -x local script1-local.pig

4.Review the result file (either script1-local-results.txt or script2-local-results.txt):

$ ls -l script1-local-results.txt
$ cat script1-local-results.txt

Pig Scripts: Hadoop Mode

To run the Pig scripts in hadoop (mapreduce) mode, do the following:

1.Go to the /home/hadoop-user/pig directory on Hadoop VM.
2.Review Pig Script 1 and Pig Script 2.
3.Copy the excite.log.bz2 file from the pigtmp directory to the HDFS directory.

$ hadoop fs –copyFromLocal excite.log.bz2 .

4.The HADOOPSITEPATH environment variable is set to the location of your hadoop-site.xml file i.e. "/home/hadoop-user/hadoop-tutorial-conf/" directory.
5.Execute the following command (using either script1-hadoop.pig or script2-hadoop.pig):

$ java -cp $PIGDIR/pig.jar:$HADOOPSITEPATH org.apache.pig.Main script1-hadoop.pig

6.Review the result files (located in either the script1-hadoop-results or script2-hadoop-results HDFS directory):

$ hadoop fs -ls script1-hadoop-results
$ hadoop fs -cat 'script1-hadoop-results/*' | less


Pig Script 1: Query Phrase Popularity

The Query Phrase Popularity script (script1-local.pig or script1-hadoop.pig) processes a search query log file from the Excite search engine and finds search phrases that occur with particular high frequency during certain times of the day.

The script is shown here:

*Register the tutorial JAR file so that the included UDFs can be called in the script.

REGISTER ./tutorial.jar;

*Use the PigStorage function to load the excite log file (excite.log or excite-small.log) into the “raw” bag as an array of records with the fields user, time, and query.

raw = LOAD 'excite.log' USING PigStorage('\t') AS (user, time, query);

*Call the NonURLDetector UDF to remove records if the query field is empty or a URL.

clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);

*Call the ToLower UDF to change the query field to lowercase.

clean2 = FOREACH clean1 GENERATE user, time, org.apache.pig.tutorial.ToLower(query) as query;

*Because the log file only contains queries for a single day, we are only interested in the hour. The excite query log timestamp format is YYMMDDHHMMSS. Call the ExtractHour UDF to extract the hour (HH) from the time field.

houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query;

*Call the NGramGenerator UDF to compose the n-grams of the query.

ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram;

*Use the DISTINCT command to get the unique n-grams for all records.

ngramed2 = DISTINCT ngramed1;

*Use the GROUP command to group records by n-gram and hour.

hour_frequency1 = GROUP ngramed2 BY (ngram, hour);

*Use the COUNT function to get the count (occurrences) of each n-gram.

hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count;

*Use the GROUP command to group records by n-gram only. Each group now corresponds to a distinct n-gram and has the count for each hour.

uniq_frequency1 = GROUP hour_frequency2 BY group::ngram;

*For each group, identify the hour in which this n-gram is used with a particularly high frequency. Call the ScoreGenerator UDF to calculate a "popularity" score for the n-gram.

uniq_frequency2 = FOREACH uniq_frequency1 GENERATE flatten($0), flatten(org.apache.pig.tutorial.ScoreGenerator($1));

*Use the FOREACH-GENERATE command to assign names to the fields.

uniq_frequency3 = FOREACH uniq_frequency2 GENERATE $1 as hour, $0 as ngram, $2 as score, $3 as count, $4 as mean;

*Use the FILTER command to move all records with a score less than or equal to 2.0.

filtered_uniq_frequency = FILTER uniq_frequency3 BY score > 2.0;

*Use the ORDER command to sort the remaining records by hour and score.

ordered_uniq_frequency = ORDER filtered_uniq_frequency BY (hour, score);

*Use the PigStorage function to store the results. The output file contains a list of n-grams with the following fields: hour, ngram, score, count, mean.

STORE ordered_uniq_frequency INTO '/tmp/tutorial-results' USING PigStorage();

Pig Script 2: Temporal Query Phrase Popularity

The Temporal Query Phrase Popularity script (script2-local.pig or script2-hadoop.pig) processes a search query log file from the Excite search engine and compares the occurrence of frequency of search phrases across two time periods separated by twelve hours.

The script is shown here:

*Register the tutorial JAR file so that the user-defined functions (UDFs) can be called in the script.

REGISTER ./tutorial.jar;

*Use the PigStorage function to load the excite log file (excite.log or excite-small.log) into the “raw” bag as an array of records with the fields user, time, and query.

raw = LOAD 'excite.log' USING PigStorage('\t') AS (user, time, query);

*Call the NonURLDetector UDF to remove records if the query field is empty or a URL.

clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);

*Call the ToLower UDF to change the query field to lowercase.

clean2 = FOREACH clean1 GENERATE user, time, org.apache.pig.tutorial.ToLower(query) as query;

*Because the log file only contains queries for a single day, we are only interested in the hour. The excite query log timestamp format is YYMMDDHHMMSS. Call the ExtractHour UDF to extract the hour from the time field.

houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query;

*Call the NGramGenerator UDF to compose the n-grams of the query.

ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram;

*Use the DISTINCT command to get the unique n-grams for all records.

ngramed2 = DISTINCT ngramed1;

*Use the GROUP command to group the records by n-gram and hour.

hour_frequency1 = GROUP ngramed2 BY (ngram, hour);

*Use the COUNT function to get the count (occurrences) of each n-gram.

hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count;

*Use the FOREACH-GENERATE command to assign names to the fields.

hour_frequency3 = FOREACH hour_frequency2 GENERATE $0 as ngram, $1 as hour, $2 as count;

*Use the FILTER command to get the n-grams for hour ‘00’

hour00 = FILTER hour_frequency2 BY hour eq '00';

*Uses the FILTER command to get the n-grams for hour ‘12’

hour12 = FILTER hour_frequency3 BY hour eq '12';

*Use the JOIN command to get the n-grams that appear in both hours.

same = JOIN hour00 BY $0, hour12 BY $0;

*Use the FOREACH-GENERATE command to record their frequency.

same1 = FOREACH same GENERATE hour_frequency2::hour00::group::ngram as ngram, $2 as count00, $5 as count12;

*Use the PigStorage function to store the results. The output file contains a list of n-grams with the following fields: hour, count00, count12.

STORE same1 INTO '/tmp/tutorial-join-results' USING PigStorage();

Pig Latin Operators

Pig Latin provides a number of operators which filter, join, or otherwise organize data.

FOREACH: The FOREACH command operates on each element of a data bag. This is useful, for instance, for processing each input record in a bag returned by a LOAD statement.

FOREACH bagname GENERATE expression, expression...

This statement iterates over the contents of a bag. It applies the expressions on the right of the GENERATE keyword to the data provided by the current record emitted from the bag. The expressions may be, for example, the names of fields. So to extract the names of all users who accessed the site (based on the query_log.txt example shown above), we could write a query like:

FOREACH queries GENERATE userId;

In the FOREACH statement, each element of the bag is considered independently. There are no expressions which reference multiple elements being extracted from the bag's iterator at a time; this allows the statement to be processed in parallel using Hadoop MapReduce.

Expressions emitted by the GENERATE element are not limited to the names of fields; they can be fields (by name like userId or by position like $0), constants, algebraic operations, map lookups, conditional expressions, or FLATTEN expressions, described below.

Finally, these expressions may also call user-provided functions that are written in Java. These user-provided functions have access to the entire current record through a Pig library; in this way, Pig can be used as the heavy-lifting component to automate record-by-record mapping using an application-specific Java function to perform tricky parsing or evaluation logic. Pig also provides several of the most commonly-needed functions, such as COUNT, AVG, MIN, MAX, and SUM.

FLATTEN is an expression which will eliminate a level of nesting. Given a tuple which contains a bag, FLATTEN will emit several tuples each of which contains one record from the bag. For example, if we had a bag of records containing a person's name and a list of types of pets they own:

(Alice, { turtle, goldfish, cat })
(Bob, { dog, cat })

A FLATTEN command would eliminate the inner bags like so:

(Alice, turtle)
(Alice, goldfish)
(Alice, cat)
(Bob, dog)
(Bob, cat)

FILTER statements iterate over a bag and return a new bag containing all elements which pass a conditional expression, e.g.:

adults = FILTER people BY age > 21;

The COGROUP and JOIN operations perform similar functions: they unite related data elements from multiple data sets. The difference is that JOIN acts like the SQL JOIN statement, creating a flat set of output records containing the joined cross-product of the input records. The COGROUP operator, on the other hand, groups the elements by their common field and returns a set of records each containing two separate bags. The first bag is the records of the first data set with the common field, and the second bag is the records of the second data set containing the common field.

To illustrate the difference, suppose we had the flattened data set mapping people to their pets, and another flattened data set mapping people to their friends. We could create a "pets of friends" data set out of these like the following. Here are the input data sets:

pets: (owner, pet)
----------------------
(Alice, turtle)
(Alice, goldfish)
(Alice, cat)
(Bob, dog)
(Bob, cat)

friends: (friend1, friend2)
----------------------
(Cindy, Alice)
(Mark, Alice)
(Paul, Bob)

Here is what is returned by COGROUP:

COGROUP pets BY owner, friends BY friend2; returns:

( Alice, {(Alice, turtle), (Alice, goldfish), (Alice, cat)},
{(Cindy, Alice), (Mark, Alice)} )
( Bob, {(Bob, dog), (Bob, cat)}, {(Paul, Bob)} )

Contrasted with the more familiar, non-hierarchical JOIN operator:

JOIN pets BY owner, friends BY friend2; returns:

(Alice, turtle, Cindy)
(Alice, turtle, Mark)
(Alice, goldfish, Cindy)
(Alice, goldfish, Mark)
(Alice, cat, Cindy)
(Alice, cat, Mark)
(Bob, dog, Paul)
(Bob, cat, Paul)

In general, COGROUP command supports grouping on as many data sets as are desired. Three or more data sets can be joined in this fashion. It is also possible to group up elements of only a single data set; this is supported through an alternate keyword, GROUP.

A GROUP ... BY statement will organize a bag of records into bags of related items based on the field identified as their common key field. e.g., the pets bag from the previous example could be grouped up with:

GROUP pets BY owner; returns:

( Alice, {(Alice, turtle), (Alice, goldfish), (Alice, cat)} )
( Bob, {(Bob, dog), (Bob, cat)} )

In this way, GROUP and FLATTEN are effectively inverses of one another.

More complicated statements can be realized as well: operations which expect a data set as input do not need to use an explicitly-named data set; they can use one generated "inline" with another FILTER, GROUP or other statement.

When the final data set has been created by a Pig Latin script, the output can be saved to a file with the STORE command, which follows the form:

STORE data set INTO 'filename' USING function()

The provided function specifies how to serialize the data to the file; if it is omitted, then a default serializer will write plain-text tab-delimited files.

A number of additional operators exist for the purposes of removing duplicate records, sorting records, etc. This paper explains the additional operators and expression syntaxes in greater detail.

Loading Data Into Pig

The first step in using Pig is to load data into a program. Pig provides a LOAD statement for this purpose. Its format is: result = LOAD 'filename' USING fn() AS (field1, field2, ...).

This statement returns a bag of values of all the data contained in the named file. Each record in the bag is a tuple, with the fields named by field1, field2, etc. The fn() is a user-provided function that reads in the data. Pig supports user-provided Java code throughout to handle the application-specific bits of parsing. Pig Latin itself is the "glue" that then holds these application-specific functions together, routing records and other data between them.

An example data loading command (taken from this paper on Pig) is:

queries = LOAD 'query_log.txt'
USING myLoad()
AS (userId, queryString, timestamp)

The user-defined functions to load data (e.g., myLoad()) do not need to be provided. A default function for loading data exists, which will parse tab-delimited records. If the programmer did not specify field names in the AS clause, they would be addressed by positional parameters: $0, $1, and so forth.

The default loader is called PigStorage(). This loader can read files containing character-delimited tuple records. These tuples must contain only atomic values; e.g., cat, turtle, fish. Other loaders are listed in the PigBuiltins page of the Pig wiki. PigStorage() takes as an argument the character to use to delimit fields. For example, to load a table of three tab-delimited fields, the following statement can be used:

data = LOAD 'tab_delim_data.txt' USING PigStorage('\t') AS (user, time, query)

A different argument could be passed to PigStorage() to read comma- or space-delimited fields.

Pig Latin Data Types

Values in Pig Latin can be expressed by four basic data types:

* An atom is any atomic value (e.g., "fish")
* A tuple is a record of multiple values with fixed arity. e.g., ("dog", "sparky").
* A data bag is a collection of an arbitrary number of values. e.g., {("dog", "sparky"), ("fish", "goldie")}. Data bags support a scan operation for iterating through their contents.
* A data map is a collection with a lookup function translating keys to values. e.g., ["age" : 25]

All data types are fully nestable; bags may contain tuples, and maps may contain bags or other maps, etc. This differs from a traditional database model, where data must be normalized into lists of atoms. By allowing data types to be composed in this manner, Pig queries line up better to the conceptual model of the data held by the programmer. Data types may also be heterogeneous. For example, the fields of a tuple may each have different types; some may be atoms, others may be more tuples, etc. The values in a bag may hold different types, as may the values in data maps. These can vary from one record to the next in the bag. Data map keys must be atoms, for efficiency reasons.

Pig Latin

The programming language used to write Pig queries is called Pig Latin.

The MapReduce programming model can be thought of as composed of three distinct phases:

1. Process input records
2. Form groups of related records
3. Process groups into outputs

In MapReduce, the first two of these steps are handled by the mapper, and the third step is handled by the reducer. Pig Latin exposes explicit primitives that perform actions from each phase. These primitives can be composed and reordered. Furthermore, it includes additional primitives to handle operations such as filtering and joining data sets.

Introduction

Pig is a platform for analyzing large data sets. Pig's language, Pig Latin, lets you specify a sequence of data transformations such as merging data sets, filtering them, and applying functions to records or groups of records. Users can create their own functions to do special-purpose processing.

Pig Latin programs execute in a distributed fashion on a cluster. Our current implementation compiles Pig Latin programs into Map/Reduce jobs, and executes them using Hadoop on Kryptonite.

Thur purpose of Pig is to answer queries over semi-structured data such as log files. Large volumes of data are in mostly-organized formats such as log files, which define a set of standard fields for each entry. While the MapReduce programming model on top of Hadoop provides a convenient mechanism for delivering a large volume of log-structured information to an analysis program, writing analyses of mostly-structured information involves writing a large amount of tedious processing code.

Pig is a high-level language for writing queries over this sort of data. A query planner then compiles queries written in this language (called "Pig Latin") into maps and reduces which are then executed on a Hadoop cluster.

Anything which could be written in Pig can also be implemented as straight Java-based Hadoop MapReduce. But while individual programmers could develop their own suite of data analysis functions such as join, order by, etc., this requires individual programmers to develop their own (non-standard) libraries, and test them. Pig provides a tested and supported suite of the most common data-aggregation functions. It also allows programmers to provide their own application-specific code for purposes of loading and saving data, as well as for performing more complicated record-by-record evaluations.