Install SPARK in Hadoop Cluster
Apache Spark is a fast and general purpose engine for large-scale data processing over a distributed cluster. Apache Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing. Spark run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). To check SPARK in action let us first install SPARK on Hadoop YARN.
Apache Spark
SPARK provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. Hence applications can be written very quickly using any of these languages. Spark powers a rich set of stack of libraries/higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for streaming data in order to perform any complex analytics.
Spark resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs- parallelizing an existing collection in your driver program, or referencing a dataset in Hadoop InputFormat like HDFS, HBase.
Master Server Setup
We will configure our cluster to host the Spark Master Server in our NameNode & Slaves in our DataNodes. Lets ssh login to our NameNode & start the Spark installation. Select the Hadoop Version Compatible Spark Stable Release from the below link
http://spark.apache.org/downloads.html
In the time of writing this article, Spark 2.0.0 is the latest stable version. We will install Spark under /usr/local/ directory.
root@NameNode:~# cd /usr/local/
root@NameNode:/usr/local/# wget http://www-us.apache.org/dist/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz
root@NameNode:/usr/local/# tar -xzvf spark-2.0.0-bin-hadoop2.7.tgz >> /dev/null
root@NameNode:/usr/local/# mv spark-2.0.0-bin-hadoop2.7 /usr/local/spark
root@NameNode:/usr/local# rm spark-2.0.0-bin-hadoop2.7.tgz
Set the Spark environment variables in .bashrc file. Append below lines to the file and source the environment file.
root@NameNode:/usr/local# vi ~/.bashrc
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
root@NameNode:/usr/local# source ~/.bashrc
Next we need to configure Spark environment script in order to set the Java Home & Hadoop Configuration Directory. Copy the template file and then open spark-env.sh file and append the lines to the file.
root@NameNode:/usr/local# cd $SPARK_HOME/conf
root@NameNode:/usr/local/spark/conf# cp spark-env.sh.template spark-env.sh
root@NameNode:/usr/local/spark/conf# vi spark-env.sh
export JAVA_HOME=/usr/lib/jvm/java-7-oracle/jre
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_WORKER_CORES=6
Next we have to list down DataNodes which will act as the Slave server. Open the slaves file & add the datanode hostnames. In our case we have two data nodes.
root@NameNode:/usr/local/spark/conf# vi slaves
DataNode1
DataNode2
Slave Server Setup
Now we have to configure our DataNodes to act as Slave Servers. In our case we have two DataNodes. We will secure copy the spark directory with the binaries and configuration files from the NameNode to the DataNodes.
root@NameNode:/usr/local/spark/conf# cd /usr/local
root@NameNode:/usr/local# scp -r spark DataNode1:/usr/local
root@NameNode:/usr/local# scp -r spark DataNode2:/usr/local
Next we need to update the Environment configuration of Spark in all the DataNodes. Append the below two lines in the .bashrc files in both the DataNodes.
root@NameNode:/usr/local# ssh root@DataNode1
root@DataNode1:~# vi ~/.bashrc
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
root@DataNode1:~# source ~/.bashrc
root@DataNode1:~# exit
Repeat the above step for all the other DataNodes. Well we are done with the installation & configuration. So it's time to start the SPARK services.
root@NameNode:/usr/local# $SPARK_HOME/sbin/start-all.sh
Let us validate the services running in NameNode as well as in the DataNodes.
root@NameNode:/usr/local# jps
5721 NameNode
5943 SecondaryNameNode
6103 ResourceManager
6217 JobHistoryServer
6752 HQuorumPeer
6813 HMaster
16144 Master
16214 Jps
root@NameNode:/usr/local# ssh root@DataNode1
root@DataNode1:~# jps
3869 DataNode
4004 NodeManager
4196 HRegionServer
10322 Worker
10429 Jps
root@DataNode1:~# exit
Browse the Spark Master UI for details about the cluster resources (like CPU, memory), worker nodes, running application, segments, etc. at http://10.0.0.1:8080/
In our case we have setup the NameNode as Spark Master Node. Also the Spark Application UI is available at http://10.0.0.1:4040.
Configure EdgeNode to access SPARK
Let us configure of EdgeNode or Client Node to access Spark. Logon to the EdgeNode & secure copy the Spark directory from the NameNode.
root@EdgeNode:~# cd /usr/local
root@EdgeNode:/usr/local# scp -r root@NameNode:/usr/local/spark /usr/local/
After that we will set the environment variables in the EdgeNode accordingly.
root@EdgeNode:/usr/local# vi ~/.bashrc
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
root@EdgeNode:/usr/local# source ~/.bashrc
All set, so lets login to our spark shell and run an interactive map-reduce program in SPARK. So for that we will create a HDFS directory to park the output file.
root@EdgeNode:/usr/local# hadoop fs -mkdir -p /spark_analytics
Login to Spark shell. We will run a map-reduce program using spark. The sample file for this will be a data file in HDFS we have used during our PIG installation and basics tutorial.
root@EdgeNode:/usr/local# $SPARK_HOME/bin/spark-shell --master yarn
scala> val rdd_profit = sc.textFile("/pig_analytics/Profit_Q1.txt")
rdd_profit: org.apache.spark.rdd.RDD[String] = /pig_analytics/Profit_Q1.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd_profit.partitions.length
res0: Int = 2
scala> rdd_profit.count()
res1: Long = 17
scala> val header = rdd_profit.first()
header: String = Country|Date|Profit|Currency
scala> rdd_profit.take(3)
res2: Array[String] = Array(Country|Date|Profit|Currency, INDIA|2016-01-31|4500000|INR, US|2016-01-31|9000000|USD)
scala> val rdd_profit_nohdr = rdd_profit.filter(row => row != header)
rdd_profit_nohdr: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:28
scala> rdd_profit_nohdr.take(3)
res3: Array[String] = Array(INDIA|2016-01-31|4500000|INR, US|2016-01-31|9000000|USD, SINGAPORE|2016-01-31|7000000|SGD)
scala> val rdd_profit_split = rdd_profit_nohdr.map(line => line.split("\\|"))
rdd_profit_split: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:30
scala> val rdd_profit_KV = rdd_profit_split.map{ (x) => (x(0), if( x(2).isEmpty) 0 else x(2).toInt ) }
rdd_profit_KV: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:32
scala> val rdd_profit_xAus = rdd_profit_KV.filter{case (key, value) => key != "AUSTRALIA"}
rdd_profit_xAus: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at filter at <console>:34
scala> val rdd_profit_q1 = rdd_profit_xAus.reduceByKey((v1,v2) => v1 + v2)
rdd_profit_q1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:34
scala> rdd_profit_q1.collect()
res4: Array[(String, Int)] = Array((US,35900000), (SINGAPORE,28600000), (INDIA,19200000))
scala> rdd_profit_q1.repartition(1).saveAsTextFile("/spark_analytics/profit")
scala> :quit
Finally we will check the output files in HDFS.
root@EdgeNode:~# hadoop fs -ls /spark_analytics/profit
Found 3 items
-rw-r--r-- 2 root supergroup 0 /spark_analytics/profit/_SUCCESS
-rw-r--r-- 2 root supergroup 35 /spark_analytics/profit/part-00000
root@EdgeNode:~# hadoop fs -tail /spark_analytics/profit/part-00000
(US,35900000)
(SINGAPORE,28600000)
(INDIA,19200000)
So we have finally installed SPARK in distributed Hadoop Yarn. We have learn how to create a Spark RDD from an HDFS file & run Map-Reduce program using Scala to get the sum of the sales for each country.