Process Data as Job Workflow in Google Dataproc
We can submit jobs and interact directly with the data frameworks that is installed in the Google Dataproc cluster. Alternatively, we can submit one or more Job steps or Workflow Job Template to a Google Dataproc cluster. Each step is a unit of work that contains instructions to manipulate data for processing by the data framework installed on the cluster.
Cloud Dataproc jobs lets us submit and manage any Hadoop, Hive, Spark or Pig job that runs in a Cloud Dataproc clusters.
Let us now Process our Data as Job Steps in Dataproc. The following is an example process using four steps:
- Copy input dataset from Cloud Storage to HDFS using DistCp
- Process the input data as external & Hive managed tables by using a Hive program.
- Process a second input dataset by using a Pig program.
- Process the intermediate dataset by using Spark program.
Click on the Submit Job button in the Dataproc cluster to add a Job Step.
- Job ID: copy-data-from-GS-to-HDFS
- Job Type: hadoop
- Main class or JAR: org.apache.hadoop.tools.DistCp
- Arguments: -- gs://gcp-bda-demo/datasets/ hdfs:///datasets
Next let's add another Job Step.
- Job ID: create-load-Hive-Dim-Tables_create-Hive-Fact-Tables
- Job Type: Hive
- Query source type: Query file
- Query file: gs://gcp-bda-demo/scripts/tables.hql
--tables.hql
-- Databases
CREATE DATABASE sales_staging;
CREATE DATABASE sales_analytics;
-- Dates
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_dates (
year_number INT,
month_number INT,
day_of_year_number INT,
day_of_month_number INT,
day_of_week_number INT,
week_of_year_number INT,
day_name VARCHAR(20),
month_name VARCHAR (20),
quarter_number INT,
quarter_name VARCHAR(2),
year_quarter_name VARCHAR(10),
weekend_ind VARCHAR(1),
days_in_month_qty INT,
date_sk INT,
day_desc VARCHAR(10),
week_sk INT,
day_date DATE,
week_name VARCHAR(10),
week_of_month_number INT,
week_of_month_name VARCHAR(10),
month_sk INT,
quarter_sk INT,
year_sk INT,
year_sort_number VARCHAR(4),
day_of_week_sort_name VARCHAR(10)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/dates/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.dim_date(
year_number INT,
month_number INT,
day_of_year_number INT,
day_of_month_number INT,
day_of_week_number INT,
week_of_year_number INT,
day_name VARCHAR(20),
month_name VARCHAR (20),
quarter_number INT,
quarter_name VARCHAR(2),
year_quarter_name VARCHAR(10),
weekend_ind VARCHAR(1),
days_in_month_qty INT,
date_sk INT,
day_desc VARCHAR(10),
week_sk INT,
day_date DATE,
week_name VARCHAR(10),
week_of_month_number INT,
week_of_month_name VARCHAR(10),
month_sk INT,
quarter_sk INT,
year_sk INT,
year_sort_number VARCHAR(4),
day_of_week_sort_name VARCHAR(10)
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE sales_analytics.dim_date SELECT * FROM sales_staging.ext_dates;
ANALYZE TABLE sales_analytics.dim_date COMPUTE STATISTICS FOR COLUMNS;
-- Showroom
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_showroom (
id INT,
code VARCHAR(40),
name VARCHAR(50),
operation_date DATE,
staff_count INT,
country VARCHAR(50),
state VARCHAR(50),
address VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/showroom/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.dim_showroom(
id INT,
code VARCHAR(40),
name VARCHAR(50),
operation_date DATE,
staff_count INT,
country VARCHAR(50),
state VARCHAR(50),
address VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE sales_analytics.dim_showroom SELECT * FROM sales_staging.ext_showroom;
ANALYZE TABLE sales_analytics.dim_showroom COMPUTE STATISTICS FOR COLUMNS;
-- Customer
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_customer (
id INT,
first_name VARCHAR(50),
last_name VARCHAR(50),
gender VARCHAR(50),
dob DATE,
company VARCHAR(50),
job VARCHAR(50),
email VARCHAR(50),
country VARCHAR(50),
state VARCHAR(50),
address VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/customer/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.dim_customer(
id INT,
first_name VARCHAR(50),
last_name VARCHAR(50),
gender VARCHAR(50),
dob DATE,
company VARCHAR(50),
job VARCHAR(50),
email VARCHAR(50),
country VARCHAR(50),
state VARCHAR(50),
address VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE sales_analytics.dim_customer SELECT * FROM sales_staging.ext_customer;
ANALYZE TABLE sales_analytics.dim_customer COMPUTE STATISTICS FOR COLUMNS;
-- Product
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_product (
id INT,
code VARCHAR(50),
category VARCHAR(6),
make VARCHAR(50),
model VARCHAR(50),
year VARCHAR(50),
color VARCHAR(50),
price INT,
currency VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/product/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.dim_product(
id INT,
code VARCHAR(50),
category VARCHAR(6),
make VARCHAR(50),
model VARCHAR(50),
year VARCHAR(50),
color VARCHAR(50),
price INT,
currency VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE sales_analytics.dim_product SELECT * FROM sales_staging.ext_product;
ANALYZE TABLE sales_analytics.dim_product COMPUTE STATISTICS FOR COLUMNS;
-- Sales
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_sales (
id INT,
order_number VARCHAR(50),
customer_id INT,
showroom_id INT,
product_id INT,
quantity INT,
discount INT,
amount INT,
delivered VARCHAR(50),
card_type VARCHAR(50),
card_number VARCHAR(50),
txn_date DATE,
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/sales/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.fact_sales (
id INT,
order_number VARCHAR(50),
customer_id INT,
showroom_id INT,
product_id INT,
quantity INT,
discount INT,
amount INT,
net_amount INT,
delivered VARCHAR(50),
card_type VARCHAR(50),
card_number VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
PARTITIONED BY (txn_date DATE)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
-- Stocks
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_stocks (
id INT,
showroom_id INT,
product_id INT,
quantity INT,
stock_date DATE,
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/stocks/'
TBLPROPERTIES ("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS sales_analytics.fact_stocks (
id INT,
showroom_id INT,
product_id INT,
quantity INT,
stock_amount INT,
update_date TIMESTAMP,
create_date TIMESTAMP
)
PARTITIONED BY (stock_date DATE)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");
Next let's add another Job Step.
- Job ID: Pig-ETL-Hive-Fact-Tables
- Job Type: Pig
- Query source type: Query file
- Query file: gs://gcp-bda-demo/scripts/etl.pig
-- etl.pig
-- Product
ext_product = LOAD 'sales_staging.ext_product' USING org.apache.hive.hcatalog.pig.HCatLoader();
-- Sales
ext_sales = LOAD 'sales_staging.ext_sales' USING org.apache.hive.hcatalog.pig.HCatLoader();
sales_product = JOIN ext_sales BY (product_id), ext_product BY (id) PARALLEL 2;
fact_sales = FOREACH sales_product GENERATE ext_sales::id AS id, order_number AS order_number, customer_id AS customer_id, showroom_id AS showroom_id, product_id AS product_id, quantity AS quantity, discount AS discount, price*quantity AS amount, (price*quantity)- discount AS net_amount, delivered AS delivered, card_type AS card_type, card_number AS card_number, ext_sales::update_date AS update_date, ext_sales::create_date AS create_date, txn_date AS txn_date;
STORE fact_sales INTO 'sales_analytics.fact_sales' USING org.apache.hive.hcatalog.pig.HCatStorer();
-- Stocks
ext_stocks = LOAD 'sales_staging.ext_stocks' USING org.apache.hive.hcatalog.pig.HCatLoader();
stocks_product = JOIN ext_stocks BY (product_id), ext_product BY (id) PARALLEL 2;
fact_stocks = FOREACH stocks_product GENERATE ext_stocks::id AS id, showroom_id AS showroom_id, product_id AS product_id, quantity AS quantity, price*quantity AS stock_amount, ext_stocks::update_date AS update_date, ext_stocks::create_date AS create_date, stock_date AS stock_date;
STORE fact_stocks INTO 'sales_analytics.fact_stocks' USING org.apache.hive.hcatalog.pig.HCatStorer();
Next add our final Job Step.
- Job ID: PySpark-create-Sales-View
- Job Type: PySpark
- Main python file: gs://gcp-bda-demo/scripts/etl.py
# etl.py
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
warehouse_location = abspath('sales_analytics')
spark = SparkSession \
.builder \
.appName("sales_analytics") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark.sql("SELECT count(1), sum(net_amount) FROM sales_analytics.fact_sales").show()
sales_df = spark.sql("SELECT \
product.category, product.make, product.color, \
showroom.name as showroom_name, showroom.state as showroom_state, \
customer.gender, customer.state as customer_state, \
sales.card_type, sales.quantity, sales.amount, sales.discount, sales.net_amount, sales.txn_date, dates.date_sk \
FROM sales_analytics.fact_sales as sales \
INNER JOIN sales_analytics.dim_product product \
ON sales.product_id = product.id \
INNER JOIN sales_analytics.dim_showroom showroom \
ON sales.showroom_id = showroom.id \
INNER JOIN sales_analytics.dim_customer customer \
ON sales.customer_id = customer.id \
INNER JOIN sales_analytics.dim_date dates \
ON sales.txn_date = dates.day_date \
")
sales_df.createOrReplaceTempView("temp_vw_sales")
spark.sql("CREATE TABLE sales_analytics.vw_sales AS SELECT * FROM temp_vw_sales");
Cloud Dataproc workflow templates provide a flexible and easy-to-use mechanism for managing and executing workflows. A workflow template is a reusable workflow configuration that defines a Directed Acyclic Graph (DAG) of jobs with information on where to run those jobs.