Logo DWBI.org Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
Big Data

Install SPARK in Hadoop Cluster

Updated on Oct 03, 2020

Apache Spark is a fast and general purpose engine for large-scale data processing over a distributed cluster. Apache Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing. Spark run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). To check SPARK in action let us first install SPARK on Hadoop YARN.

Apache Spark

SPARK provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. Hence applications can be written very quickly using any of these languages. Spark powers a rich set of stack of libraries/higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for streaming data in order to perform any complex analytics.

Spark resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs- parallelizing an existing collection in your driver program, or referencing a dataset in Hadoop InputFormat like HDFS, HBase.

Master Server Setup

We will configure our cluster to host the Spark Master Server in our NameNode & Slaves in our DataNodes. Lets ssh login to our NameNode & start the Spark installation. Select the Hadoop Version Compatible Spark Stable Release from the below link
http://spark.apache.org/downloads.html

In the time of writing this article, Spark 2.0.0 is the latest stable version. We will install Spark under /usr/local/ directory.

root@NameNode:~# cd /usr/local/
root@NameNode:/usr/local/# wget http://www-us.apache.org/dist/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz
root@NameNode:/usr/local/# tar -xzvf spark-2.0.0-bin-hadoop2.7.tgz >> /dev/null
root@NameNode:/usr/local/# mv spark-2.0.0-bin-hadoop2.7 /usr/local/spark
root@NameNode:/usr/local# rm spark-2.0.0-bin-hadoop2.7.tgz

Set the Spark environment variables in .bashrc file. Append below lines to the file and source the environment file.

root@NameNode:/usr/local# vi ~/.bashrc

export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

root@NameNode:/usr/local# source ~/.bashrc

Next we need to configure Spark environment script in order to set the Java Home & Hadoop Configuration Directory. Copy the template file and then open spark-env.sh file and append the lines to the file.

root@NameNode:/usr/local# cd $SPARK_HOME/conf
root@NameNode:/usr/local/spark/conf# cp spark-env.sh.template spark-env.sh
root@NameNode:/usr/local/spark/conf# vi spark-env.sh

export JAVA_HOME=/usr/lib/jvm/java-7-oracle/jre
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_WORKER_CORES=6

Next we have to list down DataNodes which will act as the Slave server. Open the slaves file & add the datanode hostnames. In our case we have two data nodes.

root@NameNode:/usr/local/spark/conf# vi slaves

DataNode1
DataNode2

Slave Server Setup

Now we have to configure our DataNodes to act as Slave Servers. In our case we have two DataNodes. We will secure copy the spark directory with the binaries and configuration files from the NameNode to the DataNodes.

root@NameNode:/usr/local/spark/conf# cd /usr/local 
root@NameNode:/usr/local# scp -r spark DataNode1:/usr/local
root@NameNode:/usr/local# scp -r spark DataNode2:/usr/local

Next we need to update the Environment configuration of Spark in all the DataNodes. Append the below two lines in the .bashrc files in both the DataNodes.

root@NameNode:/usr/local# ssh root@DataNode1
root@DataNode1:~# vi ~/.bashrc

export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

root@DataNode1:~# source ~/.bashrc
root@DataNode1:~# exit

Repeat the above step for all the other DataNodes. Well we are done with the installation & configuration. So it's time to start the SPARK services.

root@NameNode:/usr/local# $SPARK_HOME/sbin/start-all.sh

Let us validate the services running in NameNode as well as in the DataNodes.

root@NameNode:/usr/local# jps

5721 NameNode
5943 SecondaryNameNode
6103 ResourceManager
6217 JobHistoryServer
6752 HQuorumPeer
6813 HMaster
16144 Master
16214 Jps
root@NameNode:/usr/local# ssh root@DataNode1
root@DataNode1:~# jps

3869 DataNode
4004 NodeManager
4196 HRegionServer
10322 Worker
10429 Jps

root@DataNode1:~# exit

Browse the Spark Master UI for details about the cluster resources (like CPU, memory), worker nodes, running application, segments, etc. at http://10.0.0.1:8080/

In our case we have setup the NameNode as Spark Master Node. Also the Spark Application UI is available at http://10.0.0.1:4040.

Configure EdgeNode to access SPARK

Let us configure of EdgeNode or Client Node to access Spark. Logon to the EdgeNode & secure copy the Spark directory from the NameNode.

root@EdgeNode:~# cd /usr/local
root@EdgeNode:/usr/local# scp -r root@NameNode:/usr/local/spark /usr/local/

After that we will set the environment variables in the EdgeNode accordingly.

root@EdgeNode:/usr/local# vi ~/.bashrc

export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
root@EdgeNode:/usr/local# source ~/.bashrc

All set, so lets login to our spark shell and run an interactive map-reduce program in SPARK. So for that we will create a HDFS directory to park the output file.

root@EdgeNode:/usr/local# hadoop fs -mkdir -p /spark_analytics

Login to Spark shell. We will run a map-reduce program using spark. The sample file for this will be a data file in HDFS we have used during our PIG installation and basics tutorial.

root@EdgeNode:/usr/local# $SPARK_HOME/bin/spark-shell --master yarn

scala> val rdd_profit = sc.textFile("/pig_analytics/Profit_Q1.txt")
rdd_profit: org.apache.spark.rdd.RDD[String] = /pig_analytics/Profit_Q1.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> rdd_profit.partitions.length
res0: Int = 2

scala> rdd_profit.count()
res1: Long = 17

scala> val header = rdd_profit.first()
header: String = Country|Date|Profit|Currency

scala> rdd_profit.take(3)
res2: Array[String] = Array(Country|Date|Profit|Currency, INDIA|2016-01-31|4500000|INR, US|2016-01-31|9000000|USD)

scala> val rdd_profit_nohdr = rdd_profit.filter(row => row != header)
rdd_profit_nohdr: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:28

scala> rdd_profit_nohdr.take(3)
res3: Array[String] = Array(INDIA|2016-01-31|4500000|INR, US|2016-01-31|9000000|USD, SINGAPORE|2016-01-31|7000000|SGD)

scala> val rdd_profit_split = rdd_profit_nohdr.map(line => line.split("\\|"))
rdd_profit_split: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:30

scala> val rdd_profit_KV = rdd_profit_split.map{ (x) => (x(0), if( x(2).isEmpty) 0 else x(2).toInt ) }
rdd_profit_KV: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:32

scala> val rdd_profit_xAus = rdd_profit_KV.filter{case (key, value) => key != "AUSTRALIA"}
rdd_profit_xAus: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at filter at <console>:34

scala> val rdd_profit_q1 = rdd_profit_xAus.reduceByKey((v1,v2) => v1 + v2)
rdd_profit_q1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:34

scala> rdd_profit_q1.collect()
res4: Array[(String, Int)] = Array((US,35900000), (SINGAPORE,28600000), (INDIA,19200000))

scala> rdd_profit_q1.repartition(1).saveAsTextFile("/spark_analytics/profit")

scala> :quit

Finally we will check the output files in HDFS.

root@EdgeNode:~# hadoop fs -ls /spark_analytics/profit
Found 3 items
-rw-r--r--   2 root supergroup          0  /spark_analytics/profit/_SUCCESS
-rw-r--r--   2 root supergroup         35  /spark_analytics/profit/part-00000

root@EdgeNode:~# hadoop fs -tail /spark_analytics/profit/part-00000
(US,35900000)
(SINGAPORE,28600000)
(INDIA,19200000)

So we have finally installed SPARK in distributed Hadoop Yarn. We have learn how to create a Spark RDD from an HDFS file & run Map-Reduce program using Scala to get the sum of the sales for each country.