Monday, September 7, 2015

Deploying the Spark-0.7.x Cluster in Standalone Mode


To deploy the Spark Cluster in the Standalone Mode, run the following script present in the Spark Setup on the cluster's Master node

bin/start-all.sh

If everything is fine, the Spark Master UI should be accessible on port 8083 (or as per the specification provided in "spark-env.sh". In case of no explicit specification, default port is 8080) of the Master node.

Run a Spark Application


Simple Spark applications such as Word Count are available in "spark-0.7.x/examples/src/main/java/spark/examples".

Most of them expect as an argument the "master" on which to run the application, which needs to be "local" when running on a system locally and "spark://MASTER_IP:PORT" (for example spark://192.10.0.3:7077) when running in Standalone mode.

The job submitted should appear on the Spark Master's Web UI as well.

Deploy the Mesos Cluster


If the Spark is to be run over a cluster managed by Mesos. For this, a Mesos cluster needs to be deployed, steps for which are available in the "Setting up a Mesos Cluster" post.

To start a Mesos Cluster, execute the command below on the Mesos Master:
/usr/local/sbin/mesos-start-cluster.sh

If all goes well, Mesos's web UI should be visible on port 8080 of the master machine.

Run a Spark Application over Mesos


To run the Spark applications over the Mesos Cluster, specify "mesos://MASTER_IP:PORT" as the value of the "master" argument. A sample of the URL could be "mesos://192.10.0.3:5050".

The job submitted should appear on the Mesos Master's Web UI as well.

Friday, September 4, 2015

Setting up a Mesos-0.9.0 Cluster

Apart from running in Standalone mode, Spark can also run on clusters managed by Apache Mesos. "Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks." 

In addition to Spark, Hadoop, MPI and Hypertable also support running on clusters managed by Apache Mesos. 

Listed below are the steps for deploying a Mesos Cluster. These should to be run on the node supposed to be the Master node in the Mesos Cluster.

1. Mesos 0.9.0-incubating can be downloaded from:
http://archive.apache.org/dist/incubator/mesos/mesos-0.9.0-incubating/                                                                                                   

2. Extract Mesos setup
tar -xvzf  mesos-0.9.0-incubating.tar.gz                                                                                                                                                                                                    

3. Change the current working directory to the extracted mesos setup for it's compilation. 
cd mesos-0.9.0                                                                                                                                                                                                                                                         

The JAVA_HOME to be used needs to specified, while configuring Mesos. This can be done by specifying a command line option "--with-java-home" to the configure command as shown below:
./configure --with-java-home=/usr/lib/jvm/jre-1.6.0-openjdk.x86_64                                                                                                  

After configuring, run the following two commands:
make 
sudo make install                                                                                                                                                                                        

The following files and directories should have been created, if the above steps have been executed successfully:
/usr/local/lib/libmesos.so 
/usr/local/sbin/mesos-daemon.sh  
/usr/local/sbin/mesos-slave             
/usr/local/sbin/mesos-start-masters.sh  
/usr/local/sbin/mesos-stop-cluster.sh 
/usr/local/sbin/mesos-stop-slaves.sh
/usr/local/sbin/mesos-master     
/usr/local/sbin/mesos-start-cluster.sh  
/usr/local/sbin/mesos-start-slaves.sh   
/usr/local/sbin/mesos-stop-masters.sh
/usr/local/var/mesos/conf/mesos.conf.template
/usr/local/var/mesos/deploy                                                                                                                                                                                                    

4. Add the MESOS_NATIVE_LIBRARY variable declaration to "conf/spark-env.sh" in Spark's "conf" directory as shown below:
export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so                                                                                                                                           

5. Copy the Mesos setup on all the nodes to be included in the Mesos cluster on the same location or simply setup Mesos on each of them by running:
cd mesos-0.9.0
sudo make install                                                                                                                                                                                                                                           

This completes the process of setting up a Mesos Cluster.

Configure Mesos for deployment


1. On the Mesos Cluster's master node, edit the files "/usr/local/var/mesos/deploy/masters" to list down the IP of the Master node and "/usr/local/var/mesos/deploy/slaves" to list down the IPs of the slaves.

2. On all nodes of the Mesos Cluster, edit "/usr/local/var/mesos/conf/mesos.conf" and add the line master=HOST:5050, where HOST is the IP of the Mesos Cluster's master node.

This is the end of the Configuration Phase.

Good luck.

Setting up Spark-0.7.x in Standalone Mode


A Spark Cluster in Standalone Mode comprises of one Master and multiple Spark Worker processes. Standalone mode can be used both on a single local machine or on a cluster. This mode does not require any external resource manager such as Mesos.



To deploy a Spark Cluster in Standalone mode, the following steps need to be executed on any one of the nodes.

1. Download the spark-0.7.x setup from: 
http://spark.apache.org/downloads.html

2. Extract the Spark setup
tar -xzvf spark-0.7.x-sources.tgz

3. Spark requires Scala's bin directory to be present in the PATH variable of the linux machine. Scala 2.9.3 for Linux can be downloaded from:
http://www.scala-lang.org/downloads

4. Extract the Scala setup  
tar -xzvf scala-2.9.3.tgz

5. Export the Scala home by appending the following line into "~/.bashrc" (for CentOS) or "/etc/environment" (for Ubuntu)
export SCALA_HOME=/location_of_extracted_scala_setup/scala-2.9.3

6. Spark can be compiled "sbt" or can be built using Maven. This module states the former method, because of it's simplicity of execution. To compile change directory to the extracted Spark setup and execute the following command:
sbt/sbt package

7. Create a file (if not already present) called "spark-env.sh" in Spark’s "conf" directory, by copying "conf/spark-env.sh.template", and add the SCALA_HOME variable declaration to it as described below:
export SCALA_HOME=<path to Scala directory>

The Web UI port for the Spark Master and Worker can also be optionally specified by appending the following to "spark-env.sh"
export SPARK_MASTER_WEBUI_PORT=8083
export SPARK_WORKER_WEBUI_PORT=8084

8. To specify the nodes which would behave as the Workers, the IP of the nodes are to mentioned in "conf/slaves". For a cluster containing two worker nodes with IP 192.10.0.1 and 192.10.0.2, the "conf/slaves" would contain:
192.10.0.1
192.10.0.2

This completes the setup process on one node. 

For setting up Spark on the other nodes of the cluster, the Spark and Scala Setup should be copied on same locations on the rest of the nodes of the cluster.

Lastly, edit the /etc/hosts file on all the nodes to add the "IP HostName" entries of all the other nodes in the cluster.

Hope that helps !!

Spark Overview

Spark is a cluster computing framework i.e. a framework which uses multiple workstations, multiple storage devices, and redundant interconnections, to form an abstract single highly available system. 

Spark has been imparted the following features:

- open-source under BSD licence 
- in-memory processing
- multi-language APIs in Scala, Java and Python
- rich array of parallel operators
- runnable on Apache Mesos, YARN, Amazon EC2 or in standalone mode
- best suitable for highly iterative jobs
- efficient for interactive data mining jobs

Need of Spark


Spark is a result of the fleeting ongoing developments in the Big-Data world. "Big-Data", a term that can be used to describe data which has got 3 V's to it, Volume, Variety and Velocity. To store and process Big-Data, specialized frameworks were in demand. Hadoop is a predominantly established software framework, composed of a file-system called Hadoop Distributed File System(HDFS) and Map-Reduce. Spark is a strong contender of Map-Reduce. Due to its in-memory processing capability, Spark offers lightening fast results as compared to Map-Reduce.

When to use Spark?


Apart from using Spark for common data processing applications, it should be used specifically for applications where in-memory operations are a major percentage to the processing. The following are the type of applications in which Spark specializes:

1. Iterative algorithms
2. Interactive data mining

Who is using Spark?


UC Berkeley AMPLab is the developer of Spark. Apart from Berkeley, which runs large-scale applications such as spam filtering and traffic prediction, 14 other companies including Conviva, Quantifind have contributed to Spark.

Saturday, July 25, 2015

Mahout’s Naïve Bayes: Test Phase

This post is in continuation to my previous post where Mahout Naive Bayes "trainnb" command has been explained. This one would describe the internal execution steps of the "testnb" command, which is used to test the model we build using 'trainnb".

Command Line Options for "testnb"


Generic Options

 -archives <paths>: comma separated archives to be unarchived on the compute machines.                                                            
 -conf <configuration file>:  specify an application configuration file
 -D <property=value>: use value for given property
 -files <paths>: comma separated files to be copied to the map reduce cluster
 -fs<local|namenode:port>: specify a namenode
 -jt<local|jobtracker:port>: specify a job tracker
 -libjars<paths>: comma separated jar files to include in the classpath.
 -tokenCacheFile<tokensFile>:  name of the file with the tokens

Job-Specific Options
                                                       
  --input (-i) input: Path to job input directory.                                                                                                                                                    
  --output (-o) output: The directory pathname for output.        
  --testComplementary (-c): test complementary?
  --labelIndex (-li) labelIndex: The path to the location of the label index      
  --overwrite (-ow): If present, overwrite the output directory before running job                              
  --help (-h): Print out help                            
  --tempDirtempDir: Intermediate output directory              
  --startPhasestartPhase: First phase to run                        
  --endPhaseendPhase: Last phase to run
  --runSequential (-seq):  run sequential?
  --model (-m) model: The path to the model built during training              

Flow of execution of the "testnb" command



Hope it helped!


Mahout’s Naïve Bayes: Train Phase

Mahout’s Naïve Bayes Classification algorithm executes in two phases:
  1. Train Phase: Trains a model using pre-processed train data
  2. Test Phase: Classify documents (pre-processed) with the help of the model
 This blog post provides code-level understanding of the training process in the algorithm. And testing phase is covered in the next blog. The Mahout command "trainnb" is used to train a Naive Bayes model in Mahout.

"trainnb" command in Mahout



For reference, above is the structure of the train data directory specified as an input (similar to the one used in data pre-processing).

Command Line options for “trainnb”


Generic Options


 -archives <paths>: comma separated archives to be unarchived on the compute machines.                
 -conf <configuration file>:  specify an application configuration file
 -D <property=value>: use value for given property
 -files <paths>: comma separated files to be copied to the map reduce cluster
 -fs<local|namenode:port>: specify a namenode
 -jt<local|jobtracker:port>: specify a job tracker
 -libjars<paths>: comma separated jar files to include inthe classpath.
 -tokenCacheFile<tokensFile>:  name of the file with the tokens

Job-Specific Options
                                                          

  --input (-i) input: Path to job input directory.                 
  --output (-o) output: The directory pathname for output.                                                                     
  --labels (-l) labels:comma-separated list of labels to include in training                                     
  --extractLabels (-el):Extract the labels from the input            
  --alphaI (-a) alphaI: smoothing parameter                          
  --trainComplementary (-c):train complementary?                         
  --labelIndex (-li) labelIndex: The path to store the label index in         
  --overwrite (-ow): If present, overwrite the output directory before running job                           
  --help (-h):Print out help                               
  --tempDirtempDir: Intermediate output directory                
  --startPhasestartPhase:First phase to run                           
  --endPhaseendPhase: Last phase to run         

Flow of execution of the "trainnb" command



Hope it helped!

Query Storm Data Streams using Esper

As you might already be aware and as documented on the web, “Esper is a component for complex event processing (CEP) which enables rapid development of applications that process large volumes of incoming messages or events, regardless of whether incoming messages are historical or real-time in nature.” Since integration of Storm and Esper was visibly very beneficial for complex processing of tuples incoming in the real-time stream in Storm Bolts, a specialized bolt called the “Esper Bolt” is available.

Esper Bolt enables you to run SQL like queries on your real-time data in storm. Since Esper Bolt is already available, you would not need to write a bolt as described above, but only use it while defining the main Topology class. Below is an illustration of the Topology with a SampleSpout emitting tuples containing 3 fields, namely "deviceId", "deviceTemp" and "time". These field names are listed in the Spout class in a method called declareOutputFields() as below:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("deviceId", "deviceTemp", "time"));
}                                                                                                                                                                                                                                           

This method is one of the methods to be overridden in both spouts and bolts, which declares names of all the fields that would be emitted per tuple.

Moving forward with the Topology definition, below is the Esper Bolt declaring a statement (query) that runs over every tuple in the incoming stream and emits "deviceId" (id of the device) and "deviceTemp” (temperature of the device)  if the value of the incoming tuple field “deviceTemp” lies between 2000.00 and 3000.00

{
TopologyBuilder builder = new TopologyBuilder();

// add the spout
builder.setSpout("deviceDataAbsorber", new SampleSpout(), 1);

        // define the Esper bolt
EsperBolt deviceTrackerBolt = new EsperBolt.Builder()
.inputs().aliasComponent("deviceDataAbsorber").withFields("deviceId", "deviceTemp", "time").ofTypes(String.class, Float.class, String.class).toEventType("Evaluate")
.outputs().onDefaultStream().emit("deviceId", "deviceTemp")
.statements().add("select evaluate.deviceId as deviceId, evaluate.deviceTemp as deviceTemp from Evaluate.win:time_batch(1 sec) as evaluate where evaluate.deviceTemp <= 3000.00f and evaluate.deviceTemp >= 2000.00f")
.build();
}

The topology can have any number of Esper Bolts or user-defined bolts feeding upon the output of the Esper Bolt. You can define similar query statements depending upon your problem statement.

Hope it helped!