Logo DWBI.org Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
GCP Analytics

Process Data as Job Workflow in Google Dataproc

Updated on Oct 19, 2021

We can submit jobs and interact directly with the data frameworks that is installed in the Google Dataproc cluster. Alternatively, we can submit one or more Job steps or Workflow Job Template to a Google Dataproc cluster. Each step is a unit of work that contains instructions to manipulate data for processing by the data framework installed on the cluster.

Cloud Dataproc jobs lets us submit and manage any Hadoop, Hive, Spark or Pig job that runs in a Cloud Dataproc clusters.

Let us now Process our Data as Job Steps in Dataproc. The following is an example process using four steps:

  1. Copy input dataset from Cloud Storage to HDFS using DistCp
  2. Process the input data as external & Hive managed tables by using a Hive program.
  3. Process a second input dataset by using a Pig program.
  4. Process the intermediate dataset by using Spark program.

Click on the Submit Job button in the Dataproc cluster to add a Job Step.

Hadoop Job 1
Hadoop Job 1
  • Job ID: copy-data-from-GS-to-HDFS
  • Job Type: hadoop
  • Main class or JAR: org.apache.hadoop.tools.DistCp
  • Arguments: -- gs://gcp-bda-demo/datasets/ hdfs:///datasets
Hadoop Job 1 Success
Hadoop Job 1 Success
HDFS Datafiles
HDFS Datafiles

Next let's add another Job Step.

Hive Job 2
Hive Job 2
  • Job ID: create-load-Hive-Dim-Tables_create-Hive-Fact-Tables
  • Job Type: Hive
  • Query source type: Query file 
  • Query file: gs://gcp-bda-demo/scripts/tables.hql
--tables.hql

-- Databases
CREATE DATABASE sales_staging;
CREATE DATABASE sales_analytics;

-- Dates
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_dates (
  year_number INT,
  month_number INT,
  day_of_year_number INT,
  day_of_month_number INT,
  day_of_week_number INT,
  week_of_year_number INT,
  day_name VARCHAR(20),
  month_name VARCHAR (20),
  quarter_number INT,
  quarter_name VARCHAR(2),
  year_quarter_name VARCHAR(10),
  weekend_ind VARCHAR(1),
  days_in_month_qty INT,
  date_sk INT,
  day_desc VARCHAR(10),
  week_sk INT,
  day_date DATE,
  week_name VARCHAR(10),
  week_of_month_number INT,
  week_of_month_name VARCHAR(10),
  month_sk INT,
  quarter_sk INT,
  year_sk INT,
  year_sort_number VARCHAR(4),
  day_of_week_sort_name VARCHAR(10)
) 
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/dates/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.dim_date(
  year_number INT,
  month_number INT,
  day_of_year_number INT,
  day_of_month_number INT,
  day_of_week_number INT,
  week_of_year_number INT,
  day_name VARCHAR(20),
  month_name VARCHAR (20),
  quarter_number INT,
  quarter_name VARCHAR(2),
  year_quarter_name VARCHAR(10),
  weekend_ind VARCHAR(1),
  days_in_month_qty INT,
  date_sk INT,
  day_desc VARCHAR(10),
  week_sk INT,
  day_date DATE,
  week_name VARCHAR(10),
  week_of_month_number INT,
  week_of_month_name VARCHAR(10),
  month_sk INT,
  quarter_sk INT,
  year_sk INT,
  year_sort_number VARCHAR(4),
  day_of_week_sort_name VARCHAR(10)
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE sales_analytics.dim_date SELECT * FROM sales_staging.ext_dates;
ANALYZE TABLE sales_analytics.dim_date COMPUTE STATISTICS FOR COLUMNS;


-- Showroom
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_showroom (
  id INT,
  code VARCHAR(40),
  name VARCHAR(50),
  operation_date DATE,
  staff_count INT,
  country VARCHAR(50),
  state VARCHAR(50),
  address VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/showroom/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.dim_showroom(
  id INT,
  code VARCHAR(40),
  name VARCHAR(50),
  operation_date DATE,
  staff_count INT,
  country VARCHAR(50),
  state VARCHAR(50),
  address VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE sales_analytics.dim_showroom SELECT * FROM sales_staging.ext_showroom;
ANALYZE TABLE sales_analytics.dim_showroom COMPUTE STATISTICS FOR COLUMNS;


-- Customer
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_customer (
  id INT,
  first_name VARCHAR(50),
  last_name VARCHAR(50),
  gender VARCHAR(50),
  dob DATE,
  company VARCHAR(50),
  job VARCHAR(50),
  email VARCHAR(50),
  country VARCHAR(50),
  state VARCHAR(50),
  address VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/customer/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.dim_customer(
  id INT,
  first_name VARCHAR(50),
  last_name VARCHAR(50),
  gender VARCHAR(50),
  dob DATE,
  company VARCHAR(50),
  job VARCHAR(50),
  email VARCHAR(50),
  country VARCHAR(50),
  state VARCHAR(50),
  address VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE sales_analytics.dim_customer SELECT * FROM sales_staging.ext_customer;
ANALYZE TABLE sales_analytics.dim_customer COMPUTE STATISTICS FOR COLUMNS;


-- Product
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_product (
  id INT,
  code VARCHAR(50),
  category VARCHAR(6),
  make VARCHAR(50),
  model VARCHAR(50),
  year VARCHAR(50),
  color VARCHAR(50),
  price INT,
  currency VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/product/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.dim_product(
  id INT,
  code VARCHAR(50),
  category VARCHAR(6),
  make VARCHAR(50),
  model VARCHAR(50),
  year VARCHAR(50),
  color VARCHAR(50),
  price INT,
  currency VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE sales_analytics.dim_product SELECT * FROM sales_staging.ext_product;
ANALYZE TABLE sales_analytics.dim_product COMPUTE STATISTICS FOR COLUMNS;


-- Sales
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_sales (
  id INT,
  order_number VARCHAR(50),
  customer_id INT,
  showroom_id INT,
  product_id INT,
  quantity INT,
  discount INT,
  amount INT,
  delivered VARCHAR(50),
  card_type VARCHAR(50),
  card_number VARCHAR(50),
  txn_date DATE,
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/sales/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.fact_sales (
  id INT,
  order_number VARCHAR(50),
  customer_id INT,
  showroom_id INT,
  product_id INT,
  quantity INT,
  discount INT,
  amount INT,
  net_amount INT,
  delivered VARCHAR(50),
  card_type VARCHAR(50),
  card_number VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
PARTITIONED BY (txn_date DATE)
STORED AS ORC 
TBLPROPERTIES ("orc.compress"="SNAPPY");


-- Stocks
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_stocks (
  id INT,
  showroom_id INT,
  product_id INT,
  quantity INT,
  stock_date DATE,
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/stocks/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.fact_stocks (
  id INT,
  showroom_id INT,
  product_id INT,
  quantity INT,
  stock_amount INT,
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
PARTITIONED BY (stock_date DATE)
STORED AS ORC 
TBLPROPERTIES ("orc.compress"="SNAPPY");
Hive Job 2 Success
Hive Job 2 Success
Hive Job 2 Tez Hive Job
Hive Job 2 Tez Hive Job

Next let's add another Job Step.

Pig Job 3
Pig Job 3
  • Job ID: Pig-ETL-Hive-Fact-Tables
  • Job Type: Pig
  • Query source type: Query file 
  • Query file: gs://gcp-bda-demo/scripts/etl.pig
-- etl.pig

-- Product
ext_product = LOAD 'sales_staging.ext_product' USING org.apache.hive.hcatalog.pig.HCatLoader();

-- Sales
ext_sales = LOAD 'sales_staging.ext_sales' USING org.apache.hive.hcatalog.pig.HCatLoader();
sales_product = JOIN ext_sales BY (product_id), ext_product BY (id) PARALLEL 2;
fact_sales = FOREACH sales_product GENERATE ext_sales::id AS id, order_number AS order_number, customer_id AS customer_id, showroom_id AS showroom_id, product_id AS product_id, quantity AS quantity, discount AS discount, price*quantity AS amount, (price*quantity)- discount AS net_amount, delivered AS delivered, card_type AS card_type, card_number AS card_number, ext_sales::update_date AS update_date, ext_sales::create_date AS create_date, txn_date AS txn_date;
STORE fact_sales INTO 'sales_analytics.fact_sales' USING org.apache.hive.hcatalog.pig.HCatStorer();

-- Stocks
ext_stocks = LOAD 'sales_staging.ext_stocks' USING org.apache.hive.hcatalog.pig.HCatLoader();
stocks_product = JOIN ext_stocks BY (product_id), ext_product BY (id) PARALLEL 2;
fact_stocks = FOREACH stocks_product GENERATE ext_stocks::id AS id, showroom_id AS showroom_id, product_id AS product_id, quantity AS quantity, price*quantity AS stock_amount, ext_stocks::update_date AS update_date, ext_stocks::create_date AS create_date, stock_date AS stock_date;
STORE fact_stocks INTO 'sales_analytics.fact_stocks' USING org.apache.hive.hcatalog.pig.HCatStorer();
Pig Job 3 MapReduce Job Success
Pig Job 3 MapReduce Job Success

Next add our final Job Step.

Spark Job 4
Spark Job 4
  • Job ID: PySpark-create-Sales-View
  • Job Type: PySpark
  • Main python file: gs://gcp-bda-demo/scripts/etl.py
# etl.py

from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row

warehouse_location = abspath('sales_analytics')

spark = SparkSession \
  .builder \
  .appName("sales_analytics") \
  .config("spark.sql.warehouse.dir", warehouse_location) \
  .enableHiveSupport() \
  .getOrCreate()

# spark.sql("SELECT count(1), sum(net_amount) FROM sales_analytics.fact_sales").show()

sales_df = spark.sql("SELECT \
  product.category, product.make, product.color, \
  showroom.name as showroom_name, showroom.state as showroom_state, \
  customer.gender, customer.state as customer_state, \
  sales.card_type, sales.quantity, sales.amount, sales.discount, sales.net_amount, sales.txn_date, dates.date_sk \
  FROM sales_analytics.fact_sales as sales \
  INNER JOIN sales_analytics.dim_product product \
  ON sales.product_id = product.id \
  INNER JOIN sales_analytics.dim_showroom showroom \
  ON sales.showroom_id = showroom.id \
  INNER JOIN sales_analytics.dim_customer customer \
  ON sales.customer_id = customer.id \
  INNER JOIN sales_analytics.dim_date dates \
  ON sales.txn_date = dates.day_date \
")

sales_df.createOrReplaceTempView("temp_vw_sales") 

spark.sql("CREATE TABLE sales_analytics.vw_sales AS SELECT * FROM temp_vw_sales");
Spark Job 4 Success
Spark Job 4 Success
Dataproc Jobs Status
Dataproc Jobs Status
Data Analysis using Hive<br>
Data Analysis using Hive

Cloud Dataproc workflow templates provide a flexible and easy-to-use mechanism for managing and executing workflows. A workflow template is a reusable workflow configuration that defines a Directed Acyclic Graph (DAG) of jobs with information on where to run those jobs.