In this article we will load our final fact table i.e. stock.

Load Stocks Fact Table

Using Sqoop we will load the stocks data, initial/base as well as incremental dataset from MySQL to HDFS.

sqoop job --meta-connect "jdbc:hsqldb:hsql://sandbox-hdp.hortonworks.com:16001/sqoop" \
--create jb_stg_stocks \
-- import \
--bindir ./ \
--driver com.mysql.jdbc.Driver \
--connect jdbc:mysql://sandbox-hdp.hortonworks.com:3306/sales \
--username root \
--password-file /user/edw_user/sales/.password \
--table stocks \
--fetch-size 1000 \
--as-textfile \
--fields-terminated-by '|' \
--target-dir /user/edw_user/sales/staging/stocks \
--incremental append \
--check-column id \
--split-by id \
--num-mappers 2

Finally execute the sqoop job to load the data from source to HDFS.

sqoop job --meta-connect "jdbc:hsqldb:hsql://sandbox-hdp.hortonworks.com:16001/sqoop" --list
sqoop job --meta-connect "jdbc:hsqldb:hsql://sandbox-hdp.hortonworks.com:16001/sqoop" --show jb_stg_stocks
sqoop job --meta-connect "jdbc:hsqldb:hsql://sandbox-hdp.hortonworks.com:16001/sqoop" --exec jb_stg_stocks

Now we will define a hive external table for the Sales staging data as well as final Hive managed ORC dimension table. Connect to Beeline CLI using edw_user as username and password as hadoop. We will connect to hive schema ‘sales_analytics’.

One time setup

beeline
!connect jdbc:hive2://sandbox-hdp.hortonworks.com:10000/sales_analytics edw_user

CREATE EXTERNAL TABLE IF NOT EXISTS ext_stocks (
id INT,
showroom_id INT,
product_id INT,
quantity INT,
stock_date DATE,
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/user/edw_user/sales/staging/stocks';

SELECT * FROM ext_stocks LIMIT 10;

Now we need to create an intermediate table to load the transformed data from staging in order to replace the product natural keys with product surrogate keys.

CREATE TABLE IF NOT EXISTS stg_stocks (
id INT,
showroom_id INT,
product_key INT,
quantity INT,
update_date TIMESTAMP,
create_date TIMESTAMP,
stock_date DATE
)
STORED AS ORC 
TBLPROPERTIES ("orc.compress"="SNAPPY");

Next we will define the Hive managed ORC fact table.

CREATE TABLE IF NOT EXISTS fact_stocks (
id INT,
showroom_id INT,
product_key INT,
quantity INT,
update_date TIMESTAMP,
create_date TIMESTAMP
)
PARTITIONED BY (stock_date DATE)
STORED AS ORC 
TBLPROPERTIES ("orc.compress"="SNAPPY");

!quit

Now we will define a Pig Script to replace the Product natural key with Surrogate key and loading stocks data to final fact table. This script will be used later in oozie workflow manager to schedule the load.

Initial/Delta Setup

vi /home/edw_user/sampledata/transform_stocks.pig

ext_stocks = LOAD 'sales_analytics.ext_stocks' USING org.apache.hive.hcatalog.pig.HCatLoader();
dim_product = LOAD 'sales_analytics.dim_product' USING org.apache.hive.hcatalog.pig.HCatLoader();
active_product = FILTER dim_product BY (active_flag == 'Y');
stocks_product = JOIN ext_stocks BY (product_id), active_product BY (id) PARALLEL 2;
stg_stocks = FOREACH stocks_product GENERATE ext_stocks::id AS id, showroom_id AS showroom_id, prod_key AS product_key, quantity AS quantity, ext_stocks::update_date AS update_date, ext_stocks::create_date AS create_date, stock_date AS stock_date;
STORE stg_stocks INTO 'sales_analytics.stg_stocks' USING org.apache.hive.hcatalog.pig.HCatStorer();
quit;

:wq


hdfs dfs -put /home/edw_user/sampledata/transform_stocks.pig /user/edw_user/sales/scripts

Execute the Pig Script to trigger the initial data transformation & loading.

pig -x tez -useHCatalog -f "/home/edw_user/sampledata/transform_stocks.pig"

Finally we load data from staging sales table to final fact table. This script will also be used later by oozie.

vi /home/edw_user/sampledata/load_stocks.hql

set hive.execution.engine=tez;
set hive.optimize.sort.dynamic.partition=true;
set hive.exec.reducers.max=31;

SELECT * FROM stg_stocks LIMIT 10;

INSERT INTO TABLE fact_stocks PARTITION(stock_date) 
SELECT id, showroom_id, product_key, quantity, create_date, update_date, stock_date
FROM stg_stocks DISTRIBUTE BY stock_date;

ANALYZE TABLE fact_stocks PARTITION(stock_date) COMPUTE STATISTICS FOR COLUMNS;

:wq


hdfs dfs -put /home/edw_user/sampledata/load_stocks.hql /user/edw_user/sales/scripts

Execute the script to trigger the initial data load.

beeline -u jdbc:hive2://sandbox-hdp.hortonworks.com:10000/sales_analytics -n edw_user -p hadoop -d org.apache.hive.jdbc.HiveDriver -f "/home/edw_user/sampledata/load_stocks.hql"

Let us quickly check the data loaded.

beeline
!connect jdbc:hive2://sandbox-hdp.hortonworks.com:10000/sales_analytics edw_user

SELECT * FROM stg_stocks LIMIT 10;
SELECT * FROM fact_stocks LIMIT 10;
SELECT fact_stocks.* FROM dim_date, fact_stocks where day_date=stock_date and day_of_week_number=7;

select p.make, sum(s.quantity) as qunatity
from fact_stocks s, dim_product p
where s.product_key = p.prod_key
group by p.make;

!quit

Finally copy & archive the stocks datafiles.

vi /home/edw_user/sampledata/archive_stocks.sh

hdfs dfs -mkdir /user/edw_user/sales/archive/stocks/`date +%Y%m%d`
hdfs dfs -mv /user/edw_user/sales/staging/stocks/* /user/edw_user/sales/archive/stocks/`date +%Y%m%d`

:wq


hdfs dfs -put /home/edw_user/sampledata/archive_stocks.sh /user/edw_user/sales/scripts
sh /home/edw_user/sampledata/archive_stocks.sh

exit

In the next article we will use Oozie Workflow Manager to define workflows to orchestrate the daily data loading mechanism.


Have a question on this subject?

Ask questions to our expert community members and clear your doubts. Asking question or engaging in technical discussion is both easy and rewarding.

Are you on Twitter?

Start following us. This way we will always keep you updated with what's happening in Data Analytics community. We won't spam you. Promise.

  • Install PIG In Client Node of Hadoop Cluster

    Apache Pig is a platform for analyzing large data sets. Pig Latin is the high level programming language that, lets us specify a sequence of data transformations such as merging data sets, filtering them, grouping them, and applying functions to...

  • Hadoop DataLake Implementation Part 7

    In this article we will load our master data table ‘Product’ as Slowly Changing Dimension of Type 2 to maintain full history, so as to analyze the sales and stocks data with reference to the historical master data.

  • Oracle Installation for SQOOP Import

    We would like to perform practical test of Apache SQOOP import/export utility between ORACLE relational database & Apache HADOOP file system, let us quickly setup an ORACLE server. For that we will be using cloud based services/servers as we did...

  • Apache Hadoop Architecture

    In this article we will learn about the Apache Hadoop framework architecture. The basic components of the Apache Hadoop HDFS & MapReduce engine are discussed in brief.

  • Install SPARK in Hadoop Cluster

    Apache Spark is a fast and general purpose engine for large-scale data processing over a distributed cluster. Apache Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing. Spark run programs up to 100x...

  • Set up Client Node (Gateway Node) in Hadoop Cluster

    Once we have our multi-node hadoop cluster up and running, let us create an EdgeNode or a GatewayNode. Gateway nodes are the interface between the Hadoop cluster and the outside network. Edge nodes are used to run client applications and cluster...

  • Hadoop DataLake Implementation Part 2

    Now that we are familiar with HDP stack, in this article we are going to access HDP sandbox command line, Ambari Web UI, Hive & Ranger to create a user for our implementation setup.

  • Understanding Map-Reduce with Examples

    In my previous article – “Fools guide to Big Data” – we have discussed about the origin of Bigdata and the need of big data analytics. We have also noted that Big Data is data that is too large, complex and dynamic for any conventional data tools...

  • SQOOP import from MySQL

    In this article we will use Apache SQOOP to import data from MySQL database. For that let us create a MySql database & user and dump some data quickly. Let us download a MySQL database named Sakila Db from internet to get started. Next we will...

  • SQOOP import from Oracle

    In this article we will use Apache SQOOP to import data from Oracle database. Now that we have an oracle server in our cluster ready, let us login to EdgeNode. Next we will configure sqoop to import this data in HDFS file system followed by direct...