Apache Pig is a platform for analyzing large data sets. Pig Latin is the high level programming language that, lets us specify a sequence of data transformations such as merging data sets, filtering them, grouping them, and applying functions to records or groups of records.

Pig comes with many built-in functions. Also we can create our own user-defined functions to do special-purpose processing. Pig Latin programs complied into Map/Reduce jobs and are executed in a distributed fashion on a hadoop cluster. Let us first install Pig in our EdgeNode. Next we will check the simple pig script execution.

Pig scripts can be executed in local mode or hadoop mode. We are going to install PIG in Hadoop Mode i.e. we will execute the pig scripts in hadoop (mapreduce) mode, and for that we need access to a Hadoop cluster and HDFS installation, that we already have.

Install PIG

First we will ssh login to our EdgeNode. Before installing PIG, we already have a release of Hadoop installed and configured. We will install PIG in the same base directory path where we already have our Hadoop binaries & configuration files and all other client tools in /usr/local.

root@EdgeNode:~# cd /usr/local

Next, we will download a recent stable release of PIG from the below Apache site:
http://www-us.apache.org/dist/pig/pig-0.16.0/
Next follow the Installation steps as below:

root@EdgeNode:/usr/local# wget http://www-us.apache.org/dist/pig/pig-0.16.0/pig-0.16.0.tar.gz
root@EdgeNode:/usr/local# tar -xzvf pig-0.16.0.tar.gz >> /dev/null
root@EdgeNode:/usr/local# mv pig-0.16.0 /usr/local/pig
root@EdgeNode:/usr/local# rm pig-0.16.0.tar.gz

Next we will set the PIG Environment variables in the .bashrc file. Append the below lines, save and quit.

root@EdgeNode:/usr/local# vi ~/.bashrc


export PIG_HOME=/usr/local/pig
export PATH=$PATH:$PIG_HOME/bin

export CLASSPATH=$CLASSPATH:/usr/local/pig/lib/*:.
export PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop

Source the environment file.

root@EdgeNode:/usr/local# source ~/.bashrc

Now let us validate pig is installed properly:

root@EdgeNode:/usr/local# cd $PIG_HOME/bin
root@EdgeNode:/usr/local/pig/bin# pig -version
Apache Pig version 0.16.0 (r1746530)
compiled Jun 01 2016, 23:10:49

Finally we have installed and configured PIG. Now let us write some simple pig latin scripts and validate the execution.

Data Processing Task

We are going to leverage Pig application as an ETL tool to extract data from a source, transform it according to our requirements and finally load it into a datastore. We will perform the following data manipulation operations using Pig Latin

  • LOAD two sample datasets from the file system
  • FILTER tuples from one dataset
  • JOIN two datasets
  • Performs transformations on FOREACH row in the data
  • STORE the results to the file system
  • GROUP the data into another output dataset
  • DUMP or display the cummulative results to screen

Data Preparation

First let us prepare two dummy datasets like below. We would first load two datasets into pig; One is the Q1 Profit of a company & another is the Currency exchange rate. We will perform filter operations to exclude new operating company AUSTRALIA from the Profit dataset. Next we will perform a JOIN & Calculate the Profit Amount for all countries in terms of USD and store the result. Additionally we will add load date as current date as a meta data column. Further we will generate an additional dataset to show the Max,Min,Average,Sum of the Q1 profit for the company and display in screen.

Profit_Q1.txt

Country|Date|Profit|Currency
INDIA|2016-01-31|4500000|INR
US|2016-01-31|9000000|USD
SINGAPORE|2016-01-31|7000000|SGD
AUSTRALIA|2016-01-31||AUD
INDIA|2016-02-29|4900000|INR
US|2016-02-29|8900000|USD
SINGAPORE|2016-02-29|7100000|SGD
AUSTRALIA|2016-02-29||AUD
INDIA|2016-03-31|5000000|INR
US|2016-03-31|9100000|USD
SINGAPORE|2016-03-31|7200000|SGD
AUSTRALIA|2016-03-31||AUD
INDIA|2016-04-30|4800000|INR
US|2016-04-30|8900000|USD
SINGAPORE|2016-04-30|7300000|SGD
AUSTRALIA|2016-04-30||AUD
Exchange_Rate.csv

Exc_Date,From,To,Rate
2016-01-31,INR,USD,0.0147
2016-01-31,SGD,USD,0.702
2016-02-29,INR,USD,0.0146
2016-02-29,SGD,USD,0.711
2016-03-31,INR,USD,0.015
2016-03-31,SGD,USD,0.742
2016-04-30,INR,USD,0.015
2016-04-30,SGD,USD,0.7439

Lets load this sample data first to local system and further put the files into HDFS. Follow the commands as below:

root@EdgeNode:/usr/local/pig/bin# cd ~

root@EdgeNode:~# echo "Country|Date|Profit|Currency
INDIA|2016-01-31|4500000|INR
US|2016-01-31|9000000|USD
SINGAPORE|2016-01-31|7000000|SGD
AUSTRALIA|2016-01-31||AUD
INDIA|2016-02-29|4900000|INR
US|2016-02-29|8900000|USD
SINGAPORE|2016-02-29|7100000|SGD
AUSTRALIA|2016-02-29||AUD
INDIA|2016-03-31|5000000|INR
US|2016-03-31|9100000|USD
SINGAPORE|2016-03-31|7200000|SGD
AUSTRALIA|2016-03-31||AUD
INDIA|2016-04-30|4800000|INR
US|2016-04-30|8900000|USD
SINGAPORE|2016-04-30|7300000|SGD
AUSTRALIA|2016-04-30||AUD" >> Profit_Q1.txt


root@EdgeNode:~# echo "Exc_Date,From,To,Rate
2016-01-31,INR,USD,0.0147
2016-01-31,SGD,USD,0.702
2016-02-29,INR,USD,0.0146
2016-02-29,SGD,USD,0.711
2016-03-31,INR,USD,0.015
2016-03-31,SGD,USD,0.742
2016-04-30,INR,USD,0.015
2016-04-30,SGD,USD,0.7439" >> Exchange_Rate.csv
root@EdgeNode:/usr/local/pig/bin# cd ~
root@EdgeNode:~# hadoop fs -mkdir /pig_analytics
root@EdgeNode:~# hadoop fs -copyFromLocal /root/Profit_Q1.txt /pig_analytics
root@EdgeNode:~# hadoop fs -copyFromLocal /root/Exchange_Rate.csv /pig_analytics

Pig Latin Script

Below is our Pig Latin Script to process the data.

root@EdgeNode:~# vi profit_analysis.pig
profit_analysis.pig

/* Script to Process Profit Analysis */

-- Extract Source Profit Data
in_profit = LOAD '/pig_analytics/Profit_Q1.txt' USING PigStorage('|') AS (country:chararray, date:chararray, profit:float, currency:chararray);

-- Filter Header & country not Australia
profit = FILTER in_profit BY (country != 'Country') AND (country != 'AUSTRALIA');

-- Extract Currency Exchange Data
in_xrate = LOAD '/pig_analytics/Exchange_Rate.csv' USING PigStorage(',') AS (exc_date:chararray, from_cur:chararray, to_cur:chararray, rate:float);

-- Filter Header
xrate = FILTER in_xrate BY (exc_date != 'Exc_Date');

-- Join dataset based on date & source currency
profit_xrate = JOIN profit BY (date, currency) LEFT OUTER, xrate BY (exc_date, from_cur);

-- Calculate conversion amount
profit_rate = FOREACH profit_xrate GENERATE $0 AS country, $3 AS curr, $1 AS date, $2 AS profit_base, $2 * ($7 IS NULL ? 1: $7) AS profit_usd, ToString(CurrentTime(),'yyyy-MM-dd') AS load_date;

-- Load final detail data into HDFS
STORE profit_rate INTO '/pig_analytics/out_profit_Q1_dtl' USING PigStorage (',');

-- Group dataset by Country
profit_by_country = GROUP profit_rate BY country;

-- Perform Aggregate operations based on Groups
profit_country = FOREACH profit_by_country GENERATE group as country, MIN(profit_rate.date) AS st_date, MAX(profit_rate.date) AS end_date, SUM(profit_rate.profit_usd) AS total_profit, AVG(profit_rate.profit_usd) AS avg_profit, ToString(CurrentTime(),'yyyy-MM-dd') AS load_date;

-- Load final summary data into HDFS
STORE profit_country INTO '/pig_analytics/out_profit_Q1' USING PigStorage (',');

root@EdgeNode:~# pig
grunt> exec /root/profit_analysis.pig
grunt> quit

Let us validate the data loaded in HDFS.

root@EdgeNode:~# hadoop fs -ls -R /pig_analytics
root@EdgeNode:~# hadoop fs -cat /pig_analytics/out_profit_Q1_dtl/part-r-00000
root@EdgeNode:~# hadoop fs -cat /pig_analytics/out_profit_Q1/part-r-00000

Next we will learn how to provision streaming data using Apache Flume. Let's get started.


Have a question on this subject?

Ask questions to our expert community members and clear your doubts. Asking question or engaging in technical discussion is both easy and rewarding.

Are you on Twitter?

Start following us. This way we will always keep you updated with what's happening in Data Analytics community. We won't spam you. Promise.

  • Hadoop DataLake Implementation Part 9

    In this article we will load our final fact table i.e. stock.

  • Understanding Map-Reduce with Examples

    In my previous article – “Fools guide to Big Data” – we have discussed about the origin of Bigdata and the need of big data analytics. We have also noted that Big Data is data that is too large, complex and dynamic for any conventional data tools...

  • Stream Webserver Log into Hdfs using FLUME

    In this article we will use Apache Flume to gather stream access log data from our remote Web Server into Hadoop Distributed File System. We will be analyzing the access log in a real-time basis. So we have to setup Flume such that it collects the...

  • Hadoop DataLake Implementation Part 7

    In this article we will load our master data table ‘Product’ as Slowly Changing Dimension of Type 2 to maintain full history, so as to analyze the sales and stocks data with reference to the historical master data.

  • Hadoop DataLake Implementation Part 4

    Now that our dummy OLTP source system & Hadoop HDFS directory structure is ready, we will first load the ‘dates’ data file in HDFS and further to a hive table.

  • Configuring MySQL as Hive Metastore

    In the previous article, we have learnt How to Install and Configure Hive with default Derby metastore. However, an embedded derby based metastore can process only one request at a time. Since this is very restrictive, we will setup a traditional...

  • How to Setup Hadoop Multi Node Cluster - Step By Step

    Setting up Hadoop in a single machine is easy, but no fun. Why? Because Hadoop is not meant for a single machine. Hadoop is meant to run on a computing cluster comprising of many machines. Running HDFS and MapReduce on a single machine is great for...

  • Hadoop DataLake Implementation Part 5

    In this article we will load the showroom master data from MySQL source system to HDFS using Sqoop as SCD Type 1.

  • Oracle Installation for SQOOP Import

    We would like to perform practical test of Apache SQOOP import/export utility between ORACLE relational database & Apache HADOOP file system, let us quickly setup an ORACLE server. For that we will be using cloud based services/servers as we did...

  • Hadoop DataLake Implementation

    In this multi-series article we will learn how to implement an Enterprise DataLake using Apache Hadoop, an open-source, java-based software framework for reliable, scalable & distributed computing. Apache Hadoop addresses the limitations of...