Process Data as Job Steps in EMR
We can submit jobs and interact directly with the data frameworks that is installed in the Amazon EMR cluster. Alternatively, we can submit one or more ordered steps to an Amazon EMR cluster. Each step is a unit of work that contains instructions to manipulate data for processing by the data framework installed on the cluster.
Let us now Process our Data as Job Steps in EMR. The following is an example process using four steps:
- Copy input dataset from S3 to HDFS using S3DistCp
- Process the input data as external & Hive managed tables by using a Hive program.
- Process a second input dataset by using a Pig program.
- Process the intermediate dataset by using Spark program.
Go to the EMR Steps Tab & Add a Step.
- Name: Copy Data from S3 to HDFS using S3DistCp
- JAR location: command-runner.jar
- Arguments: s3-dist-cp --src=s3://aws-bda-demo/datasets/ --dest=hdfs:///datasets
- Action on failure: Continue
Next let's add another EMR Step. Lets use Hive SQL script to create & load data to external tables.
- Name: Create & Load Hive Dimension Tables; Create Hive Fact Tables
- JAR location: command-runner.jar
- Arguments: hive-script --run-hive-script --args -f s3://aws-bda-demo/scripts/tables.hql
- Action on failure: Continue
--tables.hql
-- Databases
CREATE DATABASE sales_staging;
CREATE DATABASE sales_analytics;
-- Dates
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_dates (
year_number INT,
month_number INT,
day_of_year_number INT,
day_of_month_number INT,
day_of_week_number INT,
week_of_year_number INT,
day_name VARCHAR(20),
month_name VARCHAR (20),
quarter_number INT,
quarter_name VARCHAR(2),
year_quarter_name VARCHAR(10),
weekend_ind VARCHAR(1),
days_in_month_qty INT,
date_sk INT,
day_desc VARCHAR(10),
week_sk INT,
day_date DATE,
week_name VARCHAR(10),
week_of_month_number INT,
week_of_month_name VARCHAR(10),
month_sk INT,
quarter_sk INT,
year_sk INT,
year_sort_number VARCHAR(4),
day_of_week_sort_name VARCHAR(10)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/dates/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.dim_date(
year_number INT,
month_number INT,
day_of_year_number INT,
day_of_month_number INT,
day_of_week_number INT,
week_of_year_number INT,
day_name VARCHAR(20),
month_name VARCHAR (20),
quarter_number INT,
quarter_name VARCHAR(2),
year_quarter_name VARCHAR(10),
weekend_ind VARCHAR(1),
days_in_month_qty INT,
date_sk INT,
day_desc VARCHAR(10),
week_sk INT,
day_date DATE,
week_name VARCHAR(10),
week_of_month_number INT,
week_of_month_name VARCHAR(10),
month_sk INT,
quarter_sk INT,
year_sk INT,
year_sort_number VARCHAR(4),
day_of_week_sort_name VARCHAR(10)
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE sales_analytics.dim_date SELECT * FROM sales_staging.ext_dates;
ANALYZE TABLE sales_analytics.dim_date COMPUTE STATISTICS FOR COLUMNS;
-- Showroom
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_showroom (
id INT,
code VARCHAR(40),
name VARCHAR(50),
operation_date DATE,
staff_count INT,
country VARCHAR(50),
state VARCHAR(50),
address VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/showroom/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.dim_showroom(
id INT,
code VARCHAR(40),
name VARCHAR(50),
operation_date DATE,
staff_count INT,
country VARCHAR(50),
state VARCHAR(50),
address VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE sales_analytics.dim_showroom SELECT * FROM sales_staging.ext_showroom;
ANALYZE TABLE sales_analytics.dim_showroom COMPUTE STATISTICS FOR COLUMNS;
-- Customer
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_customer (
id INT,
first_name VARCHAR(50),
last_name VARCHAR(50),
gender VARCHAR(50),
dob DATE,
company VARCHAR(50),
job VARCHAR(50),
email VARCHAR(50),
country VARCHAR(50),
state VARCHAR(50),
address VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/customer/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.dim_customer(
id INT,
first_name VARCHAR(50),
last_name VARCHAR(50),
gender VARCHAR(50),
dob DATE,
company VARCHAR(50),
job VARCHAR(50),
email VARCHAR(50),
country VARCHAR(50),
state VARCHAR(50),
address VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE sales_analytics.dim_customer SELECT * FROM sales_staging.ext_customer;
ANALYZE TABLE sales_analytics.dim_customer COMPUTE STATISTICS FOR COLUMNS;
-- Product
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_product (
id INT,
code VARCHAR(50),
category VARCHAR(6),
make VARCHAR(50),
model VARCHAR(50),
year VARCHAR(50),
color VARCHAR(50),
price INT,
currency VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/product/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.dim_product(
id INT,
code VARCHAR(50),
category VARCHAR(6),
make VARCHAR(50),
model VARCHAR(50),
year VARCHAR(50),
color VARCHAR(50),
price INT,
currency VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE sales_analytics.dim_product SELECT * FROM sales_staging.ext_product;
ANALYZE TABLE sales_analytics.dim_product COMPUTE STATISTICS FOR COLUMNS;
-- Sales
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_sales (
id INT,
order_number VARCHAR(50),
customer_id INT,
showroom_id INT,
product_id INT,
quantity INT,
discount INT,
amount INT,
delivered VARCHAR(50),
card_type VARCHAR(50),
card_number VARCHAR(50),
txn_date DATE,
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/sales/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.fact_sales (
id INT,
order_number VARCHAR(50),
customer_id INT,
showroom_id INT,
product_id INT,
quantity INT,
discount INT,
amount INT,
net_amount INT,
delivered VARCHAR(50),
card_type VARCHAR(50),
card_number VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
PARTITIONED BY (txn_date DATE)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
-- Stocks
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_stocks (
id INT,
showroom_id INT,
product_id INT,
quantity INT,
stock_date DATE,
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/stocks/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.fact_stocks (
id INT,
showroom_id INT,
product_id INT,
quantity INT,
stock_amount INT,
update_date TIMESTAMP,
create_date TIMESTAMP
)
PARTITIONED BY (stock_date DATE)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
Next let's add another EMR Step. Lets use Pig script to perform some ETL.
- Name: Pig ETL Hive Fact Tables
- JAR location: command-runner.jar
- Arguments: pig-script --run-pig-script --args -useHCatalog -f s3://aws-bda-demo/scripts/etl.pig
- Action on failure: Continue
-- etl.pig
SET mapred.output.direct.NativeS3FileSystem false;
SET mapred.output.direct.EmrFileSystem false;
-- Product
ext_product = LOAD 'sales_staging.ext_product' USING org.apache.hive.hcatalog.pig.HCatLoader();
-- Sales
ext_sales = LOAD 'sales_staging.ext_sales' USING org.apache.hive.hcatalog.pig.HCatLoader();
sales_product = JOIN ext_sales BY (product_id), ext_product BY (id) PARALLEL 2;
fact_sales = FOREACH sales_product GENERATE ext_sales::id AS id, order_number AS order_number, customer_id AS customer_id, showroom_id AS showroom_id, product_id AS product_id, quantity AS quantity, discount AS discount, price*quantity AS amount, (price*quantity)- discount AS net_amount, delivered AS delivered, card_type AS card_type, card_number AS card_number, ext_sales::update_date AS update_date, ext_sales::create_date AS create_date, txn_date AS txn_date;
STORE fact_sales INTO 'sales_analytics.fact_sales' USING org.apache.hive.hcatalog.pig.HCatStorer();
-- Stocks
ext_stocks = LOAD 'sales_staging.ext_stocks' USING org.apache.hive.hcatalog.pig.HCatLoader();
stocks_product = JOIN ext_stocks BY (product_id), ext_product BY (id) PARALLEL 2;
fact_stocks = FOREACH stocks_product GENERATE ext_stocks::id AS id, showroom_id AS showroom_id, product_id AS product_id, quantity AS quantity, price*quantity AS stock_amount, ext_stocks::update_date AS update_date, ext_stocks::create_date AS create_date, stock_date AS stock_date;
STORE fact_stocks INTO 'sales_analytics.fact_stocks' USING org.apache.hive.hcatalog.pig.HCatStorer();
Next add our final EMR Step.
- Name: PySpark Create Sales View
- JAR location: command-runner.jar
- Arguments: spark-submit s3://aws-bda-demo/scripts/etl.py
- Action on failure: Continue
-- etl.py
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
warehouse_location = abspath('sales_analytics')
spark = SparkSession \
.builder \
.appName("sales_analytics") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark.sql("SELECT count(1), sum(net_amount) FROM sales_analytics.fact_sales").show()
sales_df = spark.sql("SELECT \
product.category, product.make, product.color, \
showroom.name as showroom_name, showroom.state as showroom_state, \
customer.gender, customer.state as customer_state, \
sales.card_type, sales.quantity, sales.amount, sales.discount, sales.net_amount, sales.txn_date, dates.date_sk \
FROM sales_analytics.fact_sales as sales \
INNER JOIN sales_analytics.dim_product product \
ON sales.product_id = product.id \
INNER JOIN sales_analytics.dim_showroom showroom \
ON sales.showroom_id = showroom.id \
INNER JOIN sales_analytics.dim_customer customer \
ON sales.customer_id = customer.id \
INNER JOIN sales_analytics.dim_date dates \
ON sales.txn_date = dates.day_date \
")
sales_df.createOrReplaceTempView("temp_vw_sales")
spark.sql("CREATE TABLE sales_analytics.vw_sales as select * from temp_vw_sales");
The lifecycle state of an EMR Step can be one of PENDING, RUNNING, COMPLETED, FAILED or CANCELLED at any point in time.