Monday, November 14, 2011

Storm Installation

New to Storm ? My previous post could help you in finding your feet. In this post, we'll be going the extra mile in an attempt to install Storm. This has got two aspects to it:
    - Setting up Storm locally
    - Setting up a Storm cluster
Let's begin with setting up the storm cluster locally, which hardly is a two step procedure.

Setting up Storm locally

This is kind of mandatory !!!
That's because even if your aim is to get topologies working on a cluster, submitting topologies to that cluster requires a 'storm client', which requires the storm to be setup on your system locally.
Moreover it is always better to dry run topologies on your local system before deploying them as a jar on the cluster. It saves you from the exhaustive debugging on the cluster. So moving forth, we'll be undertaking the following two tasks under this heading:
  1. Setting up Storm for running topologies on the local machine
  2. Setting up the Storm client
As an obvious prerequisite you must be working on Linux with Java 6 installed on it.
So steps for accomplishing the first task :
  • Download a storm release from
    https://github.com/nathanmarz/storm/downloads
  • cd to the unzipped location of the storm setup to test if bin/storm is executable using any of these
    - bin/storm ui
    - bin/storm supervisor
    - bin/storm nimbus
Next to get the ball rolling on running topologies in Storm, you can best start with the 'storm-starter' project using Eclipse. Steps for this are :
  1. Obtain the storm-starter project from the following location :
  2. Add the storm-0.5.*.jar and other required jars present in the storm setup to the build path of your eclipse project.
  3. If you want to start with the simplest thing that could possibly work, the simplest part of this project i.e. the 'WordCountTopology.java' could do the trick.
  4. Since this topology uses the 'SplitSentence' bolt which has been implemented using python, here's a java substitute for the 'SplitSentence' class if your preference is java.

public static class SplitSentence implements IRichBolt {
    OutputCollector _collector;
    public void prepare(Map conf, TopologyContext context,        OutputCollector collector) {
       _collector = collector;
    }
     
    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
           _collector.emit(tuple, new Values(word));
     }
    _collector.ack(tuple);
  }

    public void cleanup() {
}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
     declarer.declare(new Fields("word"));
   }
}

Successfully accomplishing this leaves you have with a checked environment setup for testing and running any storm topology locally.

Setting up the Storm client

Communicating with a remote cluster and submitting topologies to it requires a Storm client on your system.
For this, configure the 'storm.yaml' file located in your storm setup's conf folder by adding the following line to it and place a copy of it at the location '~/.storm/storm.yaml'
 
   nimbus.host: "ip_of_your_remote_cluster's_nimbus"
As an eg :
   nimbus.host: "195.168.78.78”
As an important note also check the permissions of this file so that it is accessible.
Now you should be able to deploy jars on any remote cluster(steps to setup a remote cluster have been listed later in the post) using :
    cd /path_to_your_storm_setup
    bin/storm jar location_of_jar_on_your_system/WordCount.jar     storm.starter.WordCountTopology
and kill running topologies using
    bin/storm kill wordcount

Setting up a Storm Cluster

Time to kick off with setting up a Storm cluster. Here I am assuming a cluster of 3 machines, of which one would be the master node i.e. nimbus and the other two are the worker nodes.

Prerequisites :
  1. Java 6 and Python 2.6
  2. JAVA_HOME should be set, if it is not set in bashrc
These should be installed on all the machines of the cluster.

Installation steps :
  • Setup the Zookeeper Cluster :
Zookeeper is the coordinator for a Storm cluster. The interaction between the nimbus and the worker nodes is done through the Zookeeper. So its compulsary to setup a Zookeeper cluster first. You can follow the instructions from here :
         http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html
  • Install native dependencies
In the local mode, Storm uses a pure Java messaging system so that you don't need to install native dependencies on your development machine. But in case of a cluster, ZeroMQ and JZMQ are a prerequisite on all the nodes of the cluster including nimbus. 

Download and installation commands for ZeroMQ 2.1.7 :

  • Obtain ZeroMQ using
wget  http://download.zeromq.org/zeromq-2.1.7.tar.gz               
  •  tar -xzf zeromq-2.1.7.tar.gz
  •  cd zeromq-2.1.7
  •  ./configure
  •  make
  •  sudo make install

Download and installation commands for JZMQ :

  •  Obtain JZMQ using 
git clone https://github.com/nathanmarz/jzmq.git                                   
  •  cd jzmq
  •  ./autogen.sh
  •  ./configure
  •  make 
  •  sudo make install


- Copy storm setup to all the machines in the cluster . Assuming the following IP for clarity : nimbus IP : A.B.C.Nimbus supervisor node Ips : A.B.C.Sup1 and A.B.C.Sup2 Edit the conf/storm.yaml file as follows: 

storm.yaml” file for master node/nimbus :
storm.zookeeper.servers:
     - "A.B.C.Sup1"
     - "A.B.C.Sup2"

storm.local.dir: "path_to_any_dir_for_temp_storage"                               
java.library.path: "/usr/local/lib/"

nimbus.host: "127.0.0.1"
nimbus.task.launch.secs: 240
supervisor.worker.start.timeout.secs: 240
supervisor.worker.timeout.secs: 240  

storm.yaml” file for all worker nodes :
storm.zookeeper.servers: 
      - "A.B.C.Sup1"
      - "A.B.C.Sup2" 
storm.local.dir: "path_to_any_dir_for_temp_storage"                               
java.library.path: "/usr/local/lib/"
nimbus.host: "A.B.C.Nimbus"
supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703

Note : Also copy this storm.yaml file to “~/.storm/” folder on the respective systems.
This completes the cluster setup and you can now submit topologies from your system to it after creating a jar. For further assistance in this follow :

That's all from my end . . .  Hope it was helpful !!!

Friday, November 11, 2011

Twitter's Storm : Real-time Hadoop


The data processing ecosystem started to experience scarcity of solutions that could process the rising volumes of structured and unstructured data. The traditional database management systems in no way could attain the required level of performance.
This massively growing data necessitated two kinds of processing solutions due to the nature of their source. One was “ batch processing ” that could perform functions on enormous volumes of stored data and the other was “ realtime processing ” that could continuously query the incoming data and stream out the results.

In this scenario, Hadoop proved to be a savior and brilliantly covered the first aspect of processing i.e. the batch processing but a reliable solution for realtime processing was yet to be conceived that could perform as well as hadoop does in its own sphere.

STORM somewhere seems to put an end to this search.


About Storm

Storm is a distributed, fault-tolerant stream processing system.” as stated by its developers. It can be called “Hadoop of Realtime” as it fulfills all the requirements of a realtime processing platform. Parallel realtime computation is now lot more easy with Storm in the picture. It is meant for :
  • Stream processing : process messages and update a variety of databases.
  • Continuous computation : do continuous computation and stream out the results as they're computed.
  • Distributed RPC : parallelize an intense query so that you can compute it in realtime.


Terminologies
  • Topology : It is a graph of computation. All nodes have a processing role to play. As we submit jobs in hadoop, in storm we submit topologies which continue executing until they are closed(shut down).

  • Modes of Operation :
    - Local Mode : When topologies are developed and tested on local machine.
    - Remote Mode : When topologies are developed and tested on remote cluster.

  • Nimbus : In case of a cluster, the master node is called Nimbus. To run topologies on the cluster our local machine communicates with nimbus which in turn assigns jobs to all the cluster nodes.

  • Stream : It is an unbounded stream of tuples which are processed in parallel in a distributed manner. Every stream has an id.

  • Spout : The source of streams in a topology. Generally takes obtains input streams from an external source and emits them into a topology. It can emit multiple streams each of a different definition. A spout can be reliable (capable of resending if the stream has not been processed by the topology) or unreliable (the spout emits and forgets about the stream).

  • Bolt : Consumes any number of streams from Spout and processes them to generate output stream. In case of complex computations there can be multiple bolts.

  • Storm client : Installing the Storm release locally gives a storm client which is used to communicate with remote clusters. It is run from /storm_setup/bin/storm
 

Friday, September 23, 2011

Operating on HBase columns

HBase is a column oriented database which stores its contents by column rather than by row. Instead of retrieving a record or row at a time, an entire column is retrieved and thus it becomes very powerful and efficient since data analytics is usually concerned with only one field or column of a record. The access becomes much faster, and much more relevant data can be extracted from the database in a shorter period of time.

As already mentioned in my previous post, apart from the basic read, write and delete operations I have developed another set of functions to perform union, intersection on hbase tables and also use having, between and distinct clauses as we do in SQL.
Since HBase is a column oriented database i.e. retrieves an entire column at a time instead of row it becomes very powerful and efficient since data analytics is usually concerned with only one field or column of a record. And for handling columns such functions play a significant role.

The sample program below illustrate the following operations :

Obtaining all distinct entries of a column from a table
Obtaining all distinct entries of a column from a table with the number of occurance of each.
Implemention of the Having operator.
Implementing the Having operator and extracting the entire satisfying rows
Implementation of the Between operator
Implementation of the Union operator
Implementation of the Intersection operator

Program :

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseTest
{
private static HBaseConfiguration conf;
HBaseTest()
    {
        conf = new HBaseConfiguration();
        conf.addResource(new Path("/path_to_your_hbase/hbase-0.20.6/conf/hbase-site.xml"));
    }

// function to obtain distinct col entries from a table.
   
public Set<String> distinct(String tableName,String colFamilyName, String colName)
   {
    Set<String> set = new HashSet<String>();
    ResultScanner rs=null;
    Result res = null;
    String s = null;
    try
    {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes(colFamilyName),Bytes.toBytes(colName));
        rs = table.getScanner(scan);
        while((res=rs.next()) != null)
        {
            byte [] obtCol = res.getValue(Bytes.toBytes(colFamilyName+":"+colName));               
            s = Bytes.toString(obtCol);
            set.add(s);
        }
    } catch (IOException e)
    {
        System.out.println("Exception occured in retrieving data");
    }
    finally
    {
        rs.close();
    }
    return set;
   }

// function to return distinct entries with the number of occurance of each.
   
public  HashMap<String, Integer> distinctWithOccurances(String tableName,String colFamilyName, String  colName)
   {
    HashMap<String, Integer> map = new HashMap<String,Integer>();
    ResultScanner rs=null;
    Result res = null;
    try
    {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        rs = table.getScanner(scan);
        String s = null;
        while((res=rs.next()) != null)
        {
            int noofOccurance = 0;
            int count=0;
            byte [] obtCol = res.getValue(Bytes.toBytes(colFamilyName+":"+colName));               
            s = Bytes.toString(obtCol);
            Set<String> set =  map.keySet();
            Iterator iterator = set.iterator();
            if(iterator != null)
            {
            while(iterator.hasNext() && count==0)
            {
                String colEntry = (String) iterator.next();
                if(colEntry.equals(s))
                {
                noofOccurance = map.get(colEntry);
                int newNoofOccurance = noofOccurance + 1;
                map.put(s,newNoofOccurance);
                count++;
                }
            }
            }
            if(count == 0)
            {
                map.put(s,1);
            }
        }
    } catch (IOException e)
    {
        System.out.println("Exception occured in retrieving data");
    }
    finally
    {
        rs.close();
    }
    return map;
   }

// function implementing having clause.
   
public ArrayList<HashMap<String, String>> having(String tableName,String colFamilyName, String [] colName,String havingColName,String value)
  {
    ResultScanner rs=null;
    ArrayList<HashMap<String, String>> al = new ArrayList<HashMap<String, String>>();
    Result res = null;
    try
    {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        SingleColumnValueFilter singleColumnValueFilterA = new SingleColumnValueFilter(
                Bytes.toBytes(colFamilyName), Bytes.toBytes(havingColName), CompareOp.EQUAL, Bytes.toBytes(value));
              singleColumnValueFilterA.setFilterIfMissing(true); 
                FilterList filter = new FilterList(Operator.MUST_PASS_ALL, Arrays
                           .asList((Filter) singleColumnValueFilterA));
                scan.setFilter(filter); 
        rs = table.getScanner(scan);
        while((res=rs.next()) != null)
        {
            HashMap<String, String> map = new HashMap<String,String>();
            String s = null;
            for(int j=0 ; j < colName.length ; j++)
            {
            byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName),Bytes.toBytes(colName[j]));
            System.out.println(colName[j]);
            s = Bytes.toString(obtainedRow);
            map.put(colName[j],s);
        }           
        al.add(map);
        }
    } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
    finally
    {
        rs.close();
    }
        return al;
   }

// function implementing having clause and extracting the entire rows.
   
public ArrayList<HashMap<String, String>> havingWithEntireRow(String tableName,String colFamilyName[], String [][] colName,String havingColFamilyName,String havingColName,String value)
  {
    ResultScanner rs=null;
    ArrayList<HashMap<String, String>> al = new ArrayList<HashMap<String, String>>();
    Result res = null;
    try
    {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        SingleColumnValueFilter singleColumnValueFilterA = new SingleColumnValueFilter(
                    Bytes.toBytes(havingColFamilyName), Bytes.toBytes(havingColName), CompareOp.EQUAL, Bytes.toBytes(value));
            singleColumnValueFilterA.setFilterIfMissing(true); 
            FilterList filter = new FilterList(Operator.MUST_PASS_ALL, Arrays
                           .asList((Filter) singleColumnValueFilterA));
                scan.setFilter(filter); 
            rs = table.getScanner(scan);
        while((res=rs.next()) != null)
        {
            HashMap<String, String> map = new HashMap<String,String>();
            String s = null;
            for(int i=0 ; i< colFamilyName.length ; i++)
            {
            for(int j=0 ; j < colName[i].length ; j++)
            {
            byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName[i]),Bytes.toBytes(colName[i][j]));
            s = Bytes.toString(obtainedRow);
            map.put(colName[i][j],s);
            }   
            }
            al.add(map);
        }
    } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
    finally
    {
        rs.close();
    }
        return al;
    }

// function implementing the between clause.
   
public ArrayList<HashMap<String, String>> between(String tableName,String colFamilyName, String [] colName,String betweenColName,String lowerValue,String upperValue)
   {
    ResultScanner rs=null;
    ArrayList<HashMap<String, String>> al = new ArrayList<HashMap<String, String>>();
    Result res = null;
    try
    {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        SingleColumnValueFilter singleColumnValueFilterA = new SingleColumnValueFilter(
                Bytes.toBytes(colFamilyName), Bytes.toBytes(betweenColName), CompareOp.GREATER, Bytes.toBytes(lowerValue));
                singleColumnValueFilterA.setFilterIfMissing(true); 
        SingleColumnValueFilter singleColumnValueFilterB = new SingleColumnValueFilter(
                     Bytes.toBytes(colFamilyName), Bytes.toBytes(betweenColName), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(upperValue));
        singleColumnValueFilterB.setFilterIfMissing(true); 
        FilterList filter = new FilterList(Operator.MUST_PASS_ALL, Arrays.asList((Filter) singleColumnValueFilterA,
                                    singleColumnValueFilterB)); 
                scan.setFilter(filter); 
        rs = table.getScanner(scan);
        while((res=rs.next()) != null)
        {
            HashMap<String, String> map = new HashMap<String,String>();
            String s = null;
            for(int j=0 ; j < colName.length ; j++)
            {
            byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName),Bytes.toBytes(colName[j]));
            s = Bytes.toString(obtainedRow);
            map.put(colName[j],s);
            }           
            al.add(map);
        }
    } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
    finally
    {
        rs.close();
    }
            return al;
   }

// function implementing union.

public ArrayList<HashMap<String, String>> union(String tableName,String colFamilyName1, String colFamilyName2,String [] colNames1,String [] colNames2, String colName1, String colName2,String value1,String value2)
   {
    ResultScanner rs=null;
    ArrayList<HashMap<String, String>> al = new ArrayList<HashMap<String, String>>();
    Result res = null;
    try
    {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        SingleColumnValueFilter singleColumnValueFilterA = new SingleColumnValueFilter(
                Bytes.toBytes(colFamilyName1), Bytes.toBytes(colName1), CompareOp.EQUAL, Bytes.toBytes(value1));
                singleColumnValueFilterA.setFilterIfMissing(true); 
                 
            SingleColumnValueFilter singleColumnValueFilterB = new SingleColumnValueFilter(
                     Bytes.toBytes(colFamilyName2), Bytes.toBytes(colName2), CompareOp.EQUAL, Bytes.toBytes(value2));
        singleColumnValueFilterB.setFilterIfMissing(true); 
                 
        FilterList filter = new FilterList(Operator.MUST_PASS_ONE, Arrays.asList((Filter) singleColumnValueFilterA,
                                    singleColumnValueFilterB)); 
                     
               scan.setFilter(filter); 
            rs = table.getScanner(scan);
        if(colFamilyName1.equals(colFamilyName2))
        {
        while((res=rs.next()) != null)
        {
            HashMap<String, String> map = new HashMap<String,String>();
            String s = null;
            for(int j=0 ; j < colNames1.length ; j++)
            {
            byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName1),Bytes.toBytes(colNames1[j]));
            System.out.println(colNames1[j]);
            s = Bytes.toString(obtainedRow);
            System.out.println(s);
            map.put(colNames1[j],s);
            }           
            al.add(map);
        }
        }
        else
        {
            while((res=rs.next()) != null)
            {
                HashMap<String, String> map = new HashMap<String,String>();
                String s = null;
                // extract row of the first col family
                for(int j=0 ; j < colNames1.length ; j++)
                {
                byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName1),Bytes.toBytes(colNames1[j]));
                s = Bytes.toString(obtainedRow);
                map.put(colNames1[j],s);
                }           
                // extract row of the second col family
                for(int k=0 ; k < colNames2.length ; k++)
                {
                byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName2),Bytes.toBytes(colNames2[k]));
                s = Bytes.toString(obtainedRow);
                map.put(colNames2[k],s);
                }       
                // put both in the arraylist
                al.add(map);
            }   
        }
    } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
    finally
    {
        rs.close();
    }
    return al;
   }

// function implementing intersection.

public ArrayList<HashMap<String, String>> intersection(String tableName,String colFamilyName1, String colFamilyName2,String [] colNames1,String [] colNames2, String colName1, String colName2,String value1,String value2)
  {
    ResultScanner rs=null;
    ArrayList<HashMap<String, String>> al = new ArrayList<HashMap<String, String>>();
    Result res = null;
    try
    {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        SingleColumnValueFilter singleColumnValueFilterA = new SingleColumnValueFilter(
                    Bytes.toBytes(colFamilyName1), Bytes.toBytes(colName1), CompareOp.EQUAL, Bytes.toBytes(value1));
            singleColumnValueFilterA.setFilterIfMissing(true); 
        SingleColumnValueFilter singleColumnValueFilterB = new SingleColumnValueFilter(
                 Bytes.toBytes(colFamilyName2), Bytes.toBytes(colName2), CompareOp.EQUAL, Bytes.toBytes(value2));
                singleColumnValueFilterB.setFilterIfMissing(true); 
                 
        FilterList filter = new FilterList(Operator.MUST_PASS_ALL, Arrays.asList((Filter) singleColumnValueFilterA,
                                    singleColumnValueFilterB)); 
                scan.setFilter(filter); 
        rs = table.getScanner(scan);
        if(colFamilyName1.equals(colFamilyName2))
        {
        while((res=rs.next()) != null)
        {
            HashMap<String, String> map = new HashMap<String,String>();
            String s = null;
            for(int j=0 ; j < colNames1.length ; j++)
            {
            byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName1),Bytes.toBytes(colNames1[j]));
            s = Bytes.toString(obtainedRow);
            map.put(colNames1[j],s);
            }           
            al.add(map);
        }
        }
        else
        {
            while((res=rs.next()) != null)
            {
                HashMap<String, String> map = new HashMap<String,String>();
                String s = null;
                // extract row of the first col family
                for(int j=0 ; j < colNames1.length ; j++)
                {
                byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName1),Bytes.toBytes(colNames1[j]));
                s = Bytes.toString(obtainedRow);
                //System.out.println(s);
                map.put(colNames1[j],s);
                }           
                // extract row of the second col family
                for(int k=0 ; k < colNames2.length ; k++)
                {
                byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName2),Bytes.toBytes(colNames2[k]));
                s = Bytes.toString(obtainedRow);
                map.put(colNames2[k],s);
                }       
                // put both in the arraylist
                al.add(map);
            }   
        }
        } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
        finally
        {   
            System.out.println("in intersection");
            rs.close();
        }
        return al;
   }

public static void main(String args[])
    {
        HBaseTest test  = new HBaseTest();
        String tableName = "testing_table" ;
        String [] colFamilyNames = {"colFamily1","colFamily2"};
        String [][] colNames  = {{"Id","Name"},{"Addr","Designation"}};

    Set<String> set =  test.distinct(tableName,"colFamily1","Name");
    Iterator iterator = set.iterator();
    while(iterator.hasNext())
        {
            String valofKey = (String) iterator.next();
            System.out.println(valofKey + "=" + newMap.get(valofKey));
        }
          
       HashMap<String, Integer> map = new HashMap<String,Integer>();
       map = operationObj.distinctWithOccurances(tableName, "Name", "Designation");

       ArrayList<HashMap<String, String>> al_having = new ArrayList<HashMap<String, String>>();
       al_having = operationObj.havingWithEntireRow(tableName, colFamilyNames, colNames, "colFamily1", "Name", "Jayati");

       String [] reqdFieldNames1 = {"Id","Name"};
       String [] reqdFieldNames2 = {"Id","Name"};
       ArrayList<HashMap<String, String>> al_intersection = new ArrayList<HashMap<String, String>>();
       al_intersection = operationObj.intersection(tableName, colFamily1, colFamily2,reqdFieldNames1,reqdFieldNames2,"Id","Name","1","Jayati");
   
    // similarly union and between can be used
    }
}

Utilize basic read-write functions

HBase is an open source, non-relational, distributed database providing BigTable like capabilities for Hadoop. Tables in HBase can be accessed using the Java-API for HBase but unfortunately a developer would require to put in a lot of efforts to do so. That is because the API provides a very restricted set of functions. For those new to API , it takes a lot of time to understand the available classes and use them to get the required job done.

So to enable easy handling of HBase tables, I have developed a wrapper library over the existing API which provides basic methods to create, read , delete records in hbase table and also another set of functions such as distinct, having, between, intersection, union which work for HBase just as we have these working in SQL. A big fraction of our work on tables depends on these functions and their availability makes using the HBase API easy.

This post includes a sample program to illustrate the usage of read and write functions only which specifically includes the following operations :

Adding entry to a single column
Adding records with a single column family and multiple columns
Adding a row with any number of column families and columns
Obtaining a single column entry
Obtaining the entire row
Reading all entries of a particular column of a table
Reading all records of an HBase Table
Deleting a record from an HBase Table

I have used hbase-0.20.6 and hadoop-0.20.1 and you could deploy this program on your eclipse and make test classes to check it.

Program :

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseTest
{
private static HBaseConfiguration conf;
HBaseTest()
    {
        conf = new HBaseConfiguration();
        conf.addResource(new Path("/path_to_your_hbase/hbase-0.20.6/conf/hbase-site.xml"));
    }

// assigns a value to a particular column of a record

public void addAColumnEntry(String tableName, String colFamilyName, String colName, String data)
    {
        try
        {
            HTable table = new HTable(conf, tableName);
            String row = "row" + Math.random();
            byte[] rowKey = Bytes.toBytes(row);
            Put putdata = new Put(rowKey);
            putdata.add(Bytes.toBytes(colFamilyName), Bytes.toBytes(colName),Bytes.toBytes(data));
            table.put(putdata);
        } catch (IOException e)
        {
            System.out.println("Exception occured in adding data");
        }
    }

// write a record to a table having just one column family or write only a portion of a record

public void addRecordWithSingleColumnFamily(String tableName, String colFamilyName, String [] colName,String [] data)
    {
        try
        {
            HTable table = new HTable(conf, tableName);
            String row = "row" + Math.random();
            byte[] rowKey = Bytes.toBytes(row);
            Put putdata = new Put(rowKey);
            if(colName.length == data.length)
            {
            for(int i=0 ; i < colName.length ; i++)
            putdata.add(Bytes.toBytes(colFamilyName), Bytes.toBytes(colName[i]),
                    Bytes.toBytes(data[i]));
            }
            table.put(putdata);
     
        } catch (IOException e)
        {
            System.out.println("Exception occured in adding data");
        }
    }

// add a record with any number of column families

public void addRecord(String tableName, String [] colFamilyName, String [][]  colName,String [][] data)
    {
        try
        {
            HTable table = new HTable(conf, tableName);
            String row = "row" + Math.random();
            byte[] rowKey = Bytes.toBytes(row);
            Put putdata = new Put(rowKey);
            for(int j=0 ; j < colFamilyName.length ; j++)
            {
            if(colName[j].length == data[j].length)
            {
            for(int i=0 ; i < colName[j].length ; i++)
            putdata.add(Bytes.toBytes(colFamilyName[j]), Bytes.toBytes(colName[j][i]),
                    Bytes.toBytes(data[j][i]));
            }
            }
            table.put(putdata);
     
        } catch (IOException e)
        {
            System.out.println("Exception occured in adding data");
        }
    }

// returns entry of a particular column of a record

public String getColEntry(String tableName, String rowName,String colFamilyName, String colName)
    {
        String result = null;
        try
        {
            HTable table = new HTable(conf, tableName);
            byte[] rowKey = Bytes.toBytes(rowName);
            Get getRowData = new Get(rowKey);
            Result res = table.get(getRowData);
            byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName),
                    Bytes.toBytes(colName));
            result = Bytes.toString(obtainedRow);
        } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
        return result;
    }
 
// returns a row  in the form of a string.
 
public String getRow(String tableName, String rowName,String colFamilyName, String [] colName)
    {
            String result = colName[0];
        try
        {
            HTable table = new HTable(conf, tableName);
            byte[] rowKey = Bytes.toBytes(rowName);
            Get getRowData = new Get(rowKey);
            Result res = table.get(getRowData);
            for(int j=0 ; j < colName.length ; j++)
            {
            byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName),Bytes.toBytes(colName[j]));
            System.out.println(colName[j]);
            String s = Bytes.toString(obtainedRow);
            if(j==0)
                result = colName[j] + "=" + s ;
            else
                result = result + "&" + colName[j] + "=" + s;
            System.out.println(s);
            }
         
        } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
        return result;
    }
 
// returns an arraylist of all entries of a column.
 
public ArrayList<String> getCol(String tableName,String colFamilyName, String colName)
    {
        ArrayList<String> al = new ArrayList<String>();
        ResultScanner rs=null;
        Result res = null;
     
        try {
            HTable table = new HTable(conf, tableName);
         
            Scan scan = new Scan();
            scan.addColumn(Bytes.toBytes(colFamilyName),Bytes.toBytes(colName));
            rs = table.getScanner(scan);
            while((res=rs.next()) != null)
            {
                String colEntry = null;
                byte [] obtCol = res.getValue(Bytes.toBytes(colFamilyName+":"+colName));             
                colEntry = Bytes.toString(obtCol);
                al.add(colEntry);
            }
         
        } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
        finally
        {
            rs.close();
        }
        return al;

    }
 
// returns a list of hashmaps, each hashmap containing entries of a single record.

public  ArrayList<HashMap<String, String>> getTable(String tableName,String [] colFamilyName, String [][] colName)
    {
    ResultScanner rs=null;
    ArrayList<HashMap<String, String>> al = new ArrayList<HashMap<String, String>>();
    Result res = null;
    try
    {
        HTable table = new HTable(conf, tableName);
        Scan scan = new Scan();
        rs = table.getScanner(scan);
        while((res=rs.next()) != null)
        {
            HashMap<String, String> map = new HashMap<String,String>();
            String s = null;
            for(int i=0 ; i<colFamilyName.length ; i++)
            {
                for(int j=0 ; j < colName[i].length ; j++)
                    {
                        byte[] obtainedRow = res.getValue(Bytes.toBytes(colFamilyName[i]),Bytes.toBytes(colName[i][j]));
                        s = Bytes.toString(obtainedRow);
                        System.out.println(s);
                        map.put(colName[i][j],s);
                    }     
            }         
            al.add(map);
        }
    } catch (IOException e)
        {
            System.out.println("Exception occured in retrieving data");
        }
    finally
    {
        rs.close();
    }
        return al;
    }

// function to delete a row from the table.

public String deleteTableRow(String tableName, String rowName)
   {
   String result = null;
   try
   {
    HTable table = new HTable(conf, tableName);
    byte[] rowKey = Bytes.toBytes(rowName);
    Delete delRowData = new Delete(rowKey);
    table.delete(delRowData);
   } catch (IOException e)
    {
        System.out.println("Exception occured in retrieving data");
    }
  return result;

  }

public static void main(String args[])
    {
        HBaseTest test  = new HBaseTest();
        String tableName = "testing_table" ;
        String [] colFamilyNames = {"colFamily1","colFamily2"};
        String [][] colNames  = {{"Id","Name"},{"Addr","Designation"}};
         
        test.addAColumnEntry(tableName,"colFamily1","Name","Ram");
        test.addRecordWithSingleColumnFamily(tableName,"colFamily1",{"Id","Name"},{"117","Ram"});
        test.addRecord(tableName,colFamilyNames,colNames,{{"117","Ram"},{"ABC","Manager"}});

        // specify the rowKey as per your table

        test.getColEntry(tableName,rowKey,"colFamily1","Name");
        String yourRow = test.getRow(tableName,"row0.35234564623454","colFamily1",{"Id","Name"});

        ArrayList<String> al = new ArrayList<String>();
        al = test.getCol(tableName,"colFamily1","Name");

        ArrayList<HashMap<String, String>> listofmaps = new ArrayList<HashMap<String, String>>();
        listofmaps = test.getTable(tableName,colFamilyNames,colNames);

    // specify the rowKey as per your table

    test.deleteTableRow(tableName, rowKey);
    }
}

My next post would comprise of rest of the functions I have mentioned that would hopefully help to perform almost any kind of operation on your table.
Suggestions would be welcomed !!!

Thursday, May 26, 2011

Frequent Errors during Installation or Startup of Oozie Server

In spite of the fact that, Oozie Installation Steps have been entered down quite intelligibly and makes one hallucinate it as a matter of a few minutes, but as per my experience, its not that easy.. So, to somehow make it easy for you, one of my previous post is a jest of all those docs that are meant to help you install Oozie. Still here in this post, I am providing the solution to the 5 most common errors that you might have unfortunately encountered.

Error 1 :

Cannot create /var/run/oozie/oozie.pid: Directory nonexistent                               

Solution : 
Changing the permissions of the run folder as in
sudo chmod -cR 777 ./run 
sudo chown root:root -R /var/run/

Error 2 :

put: org.apache.hadoop.security.AccessControlException: Permission denied: user=jt, access=WRITE, inode="user":root:supergroup:rwxr-xr-x

Solution :
Add the following entry to your hadoop setup's conf/hdfs-site.xml 
<property> 
<name>dfs.permissions</name> 
<value>false</value> 
</property> 

Error 3 :

put:org.apache.hadoop.hdfs.server.namenode.SafeModeException:Cannot create directory /user/jt/examples. Name node is in safe mode.

Solution :
Use "hadoop dfsadmin -safemode leave" command to make the namenode leave safe mode forcefully. 
Or use "hadoop dfsadmin -safemode wait" to block till NN leaves by itself. 
If you need to get your cluster up and running quickly, you can manipulate the parameter dfs.namenode.threshold.percent. 
If you set it to 0, NN will not enter safe mode. 

Error 4 :

E0902: Exception occured: [java.io.IOException: Call to localhost/127.0.0.1:9000 failed on local exception: java.io.EOFException]

Solution : 
Check whether the port nos of jobtracker and namenode are correctly set in the job.properties file of the application you are running. 

Error 5 :

Hadoop StartUp Issue :  Hadoop fs command not working and datanode is not running                                      

Solution : 
localpath_to_hadoop_data_store/dfs/data/current/VERSION and localpath_to_hadoop_data_store/dfs/name/current/VERSION  should have the same ids , if they are not change that of the datanode(s) .

If these included one of the points where you were stuck up, I hope to have helped you. All the very best for Oozie ...