Apache Spark is a fast and general purpose engine for large-scale data processing over a distributed cluster. Apache Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing. Spark run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). To check SPARK in action let us first install SPARK on Hadoop YARN.

Spark

SPARK provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. Hence applications can be written very quickly using any of these languages. Spark powers a rich set of stack of libraries/higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for streaming data in order to perform any complex analytics.

Spark resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs- parallelizing an existing collection in your driver program, or referencing a dataset in Hadoop InputFormat like HDFS, HBase.

Master Server Setup

We will configure our cluster to host the Spark Master Server in our NameNode & Slaves in our DataNodes. Lets ssh login to our NameNode & start the Spark installation. Select the Hadoop Version Compatible Spark Stable Release from the below link
http://spark.apache.org/downloads.html
In the time of writing this article, Spark 2.0.0 is the latest stable version. We will install Spark under /usr/local/ directory.

root@NameNode:~# cd /usr/local/
root@NameNode:/usr/local/# wget http://www-us.apache.org/dist/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz
root@NameNode:/usr/local/# tar -xzvf spark-2.0.0-bin-hadoop2.7.tgz >> /dev/null
root@NameNode:/usr/local/# mv spark-2.0.0-bin-hadoop2.7 /usr/local/spark
root@NameNode:/usr/local# rm spark-2.0.0-bin-hadoop2.7.tgz

Set the Spark environment variables in .bashrc file. Append below lines to the file and source the environment file.

root@NameNode:/usr/local# vi ~/.bashrc

export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
root@NameNode:/usr/local# source ~/.bashrc

Next we need to configure Spark environment script in order to set the Java Home & Hadoop Configuration Directory. Copy the template file and then open spark-env.sh file and append the lines to the file.

root@NameNode:/usr/local# cd $SPARK_HOME/conf
root@NameNode:/usr/local/spark/conf# cp spark-env.sh.template spark-env.sh
root@NameNode:/usr/local/spark/conf# vi spark-env.sh

export JAVA_HOME=/usr/lib/jvm/java-7-oracle/jre
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_WORKER_CORES=6

Next we have to list down DataNodes which will act as the Slave server. Open the slaves file & add the datanode hostnames. In our case we have two data nodes.

root@NameNode:/usr/local/spark/conf# vi slaves

DataNode1
DataNode2

Slave Server Setup

Now we have to configure our DataNodes to act as Slave Servers. In our case we have two DataNodes. We will secure copy the spark directory with the binaries and configuration files from the NameNode to the DataNodes.

root@NameNode:/usr/local/spark/conf# cd /usr/local 
root@NameNode:/usr/local# scp -r spark DataNode1:/usr/local
root@NameNode:/usr/local# scp -r spark DataNode2:/usr/local

Next we need to update the Environment configuration of Spark in all the DataNodes. Append the below two lines in the .bashrc files in both the DataNodes.

root@NameNode:/usr/local# ssh root@DataNode1
root@DataNode1:~# vi ~/.bashrc

export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

root@DataNode1:~# source ~/.bashrc
root@DataNode1:~# exit

Repeat the above step for all the other DataNodes. Well we are done with the installation & configuration. So it's time to start the SPARK services.

root@NameNode:/usr/local# $SPARK_HOME/sbin/start-all.sh

Let us validate the services running in NameNode as well as in the DataNodes.

root@NameNode:/usr/local# jps

5721 NameNode
5943 SecondaryNameNode
6103 ResourceManager
6217 JobHistoryServer
6752 HQuorumPeer
6813 HMaster
16144 Master
16214 Jps
root@NameNode:/usr/local# ssh root@DataNode1
root@DataNode1:~# jps

3869 DataNode
4004 NodeManager
4196 HRegionServer
10322 Worker
10429 Jps

root@DataNode1:~# exit

Browse the Spark Master UI for details about the cluster resources (like CPU, memory), worker nodes, running application, segments, etc. at http://10.0.0.1:8080/
. In our case we have setup the NameNode as Spark Master Node. Also the Spark Application UI is available at http://10.0.0.1:4040.

Configure EdgeNode to access SPARK

Let us configure of EdgeNode or Client Node to access Spark. Logon to the EdgeNode & secure copy the Spark directory from the NameNode.

root@EdgeNode:~# cd /usr/local
root@EdgeNode:/usr/local# scp -r root@NameNode:/usr/local/spark /usr/local/

After that we will set the environment variables in the EdgeNode accordingly.

root@EdgeNode:/usr/local# vi ~/.bashrc

export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
root@EdgeNode:/usr/local# source ~/.bashrc

All set, so lets login to our spark shell and run an interactive map-reduce program in SPARK. So for that we will create a HDFS directory to park the output file.

root@EdgeNode:/usr/local# hadoop fs -mkdir -p /spark_analytics

Login to Spark shell. We will run a map-reduce program using spark. The sample file for this will be a data file in HDFS we have used during our PIG installation and basics tutorial.

root@EdgeNode:/usr/local# $SPARK_HOME/bin/spark-shell --master yarn

scala> val rdd_profit = sc.textFile("/pig_analytics/Profit_Q1.txt")
rdd_profit: org.apache.spark.rdd.RDD[String] = /pig_analytics/Profit_Q1.txt MapPartitionsRDD[1] at textFile at :24

scala> rdd_profit.partitions.length
res0: Int = 2

scala> rdd_profit.count()
res1: Long = 17

scala> val header = rdd_profit.first()
header: String = Country|Date|Profit|Currency

scala> rdd_profit.take(3)
res2: Array[String] = Array(Country|Date|Profit|Currency, INDIA|2016-01-31|4500000|INR, US|2016-01-31|9000000|USD)

scala> val rdd_profit_nohdr = rdd_profit.filter(row => row != header)
rdd_profit_nohdr: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at :28

scala> rdd_profit_nohdr.take(3)
res3: Array[String] = Array(INDIA|2016-01-31|4500000|INR, US|2016-01-31|9000000|USD, SINGAPORE|2016-01-31|7000000|SGD)

scala> val rdd_profit_split = rdd_profit_nohdr.map(line => line.split("\\|"))
rdd_profit_split: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at :30

scala> val rdd_profit_KV = rdd_profit_split.map{ (x) => (x(0), if( x(2).isEmpty) 0 else x(2).toInt ) }
rdd_profit_KV: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at :32

scala> val rdd_profit_xAus = rdd_profit_KV.filter{case (key, value) => key != "AUSTRALIA"}
rdd_profit_xAus: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at filter at :34

scala> val rdd_profit_q1 = rdd_profit_xAus.reduceByKey((v1,v2) => v1 + v2)
rdd_profit_q1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at :34

scala> rdd_profit_q1.collect()
res4: Array[(String, Int)] = Array((US,35900000), (SINGAPORE,28600000), (INDIA,19200000))

scala> rdd_profit_q1.repartition(1).saveAsTextFile("/spark_analytics/profit")

scala> :quit

Finally we will check the output files in HDFS.

root@EdgeNode:~# hadoop fs -ls /spark_analytics/profit
Found 3 items
-rw-r--r--   2 root supergroup          0  /spark_analytics/profit/_SUCCESS
-rw-r--r--   2 root supergroup         35  /spark_analytics/profit/part-00000

root@EdgeNode:~# hadoop fs -tail /spark_analytics/profit/part-00000
(US,35900000)
(SINGAPORE,28600000)
(INDIA,19200000)

So we have finally installed SPARK in distributed Hadoop Yarn. We have learn how to create a Spark RDD from an HDFS file & run Map-Reduce program using Scala to get the sum of the sales for each country.


Have a question on this subject?

Ask questions to our expert community members and clear your doubts. Asking question or engaging in technical discussion is both easy and rewarding.

Are you on Twitter?

Start following us. This way we will always keep you updated with what's happening in Data Analytics community. We won't spam you. Promise.

  • Hadoop DataLake Implementation Part 5

    In this article we will load the showroom master data from MySQL source system to HDFS using Sqoop as SCD Type 1.

  • Install SPARK in Hadoop Cluster

    Apache Spark is a fast and general purpose engine for large-scale data processing over a distributed cluster. Apache Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing. Spark run programs up to 100x...

  • Understanding Map-Reduce with Examples

    In my previous article – “Fools guide to Big Data” – we have discussed about the origin of Bigdata and the need of big data analytics. We have also noted that Big Data is data that is too large, complex and dynamic for any conventional data tools...

  • Hadoop DataLake Implementation Part 8

    In this article we will load our first fact table into Hive warehouse which is sales transactions.

  • Hadoop DataLake Implementation Part 2

    Now that we are familiar with HDP stack, in this article we are going to access HDP sandbox command line, Ambari Web UI, Hive & Ranger to create a user for our implementation setup.

  • Stream Webserver Log into Hdfs using FLUME

    In this article we will use Apache Flume to gather stream access log data from our remote Web Server into Hadoop Distributed File System. We will be analyzing the access log in a real-time basis. So we have to setup Flume such that it collects the...

  • Hadoop DataLake Implementation Part 3

    To complete our implementation setup we will create the source tables based on the downloaded datafiles. Let us first load the SQL files in MySQL server under a new database called ‘sales’. We will simulate this database schema as our OLTP source...

  • Oracle Installation for SQOOP Import

    We would like to perform practical test of Apache SQOOP import/export utility between ORACLE relational database & Apache HADOOP file system, let us quickly setup an ORACLE server. For that we will be using cloud based services/servers as we did...

  • Hadoop DataLake Implementation Part 10

    In this article we will create oozie workflow to orchestrate the daily loading of showroom dimension table from MySQL source to HDFS using Sqoop, followed by Loading data from HDFS to Hive warehouse using Hive and finally housekkeping & archive.

  • Hadoop DataLake Implementation Part 9

    In this article we will load our final fact table i.e. stock.