Spark-HBase Connector - The Ultimate Guide

Spark-HBase Connector - The Ultimate Guide

If you are a Data Engineer working with the Big Data ecosystem, you need your components to be connected to one other. As Spark is leading the data processing engine space with sizeable contributions and adoptions from number of organisations across the different industries. On the same context I am planning to create a series of posts specifically for various components communicating with each other using Connectors.

Today, let's start with Spark-HBase connector. Now HBase is very popular and widely used distributed and scalable Hadoop Big data store. Making this use case of connecting spark with HBase more common and in demand.

Apache Hbase documentation reads the foloowing,

Apache HBase - Use Apache HBase™ when you need random, realtime read/write access to your Big Data. This project's goal is the hosting of very large tables -- billions of rows X millions of columns -- atop clusters of commodity hardware. Apache HBase is an open-source, distributed, versioned, non-relational database modeled after Google's Bigtable: A Distributed Storage System for Structured Data by Chang et al. Just as Bigtable leverages the distributed data storage provided by the Google File System, Apache HBase provides Bigtable-like capabilities on top of Hadoop and HDFS.

Spark Hortonworks Connector ( shc-core )

shc-core is a Hortonworks supported connector which works perfectly as a bridge between Spark and HBase.

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink. With it, user can operate HBase with Spark-SQL on DataFrame and DataSet level. With the DataFrame and DataSet support, the library leverages all the optimization techniques in catalyst, and achieves data locality, partition pruning, predicate pushdown, Scanning and BulkGet, etc.

Github - github.com/hortonworks-spark/shc

Maven - mvnrepository.com/artifact/com.hortonworks...

<!-- https://mvnrepository.com/artifact/com.hortonworks.shc/shc-core -->
<dependency>
    <groupId>com.hortonworks.shc</groupId>
    <artifactId>shc-core</artifactId>
    <version>1.1.0.3.1.7.0-79</version>
</dependency>

The How to part is here -

1. Define the Catalog -

So the very first step would be to define the catalog for your HBase table.

For each table, a catalog has to be provided, which includes the row key, and the columns with data type with predefined column families, and defines the mapping between hbase column and table schema. The catalog is user defined json format.

Sample catalog -

def catalog =
      s"""{
         |"table":{"namespace":"your_table_namespace", "name":"your_table_name"},
         |"rowkey":"hbase_key_column",
         |"columns":{
         |"your_df_column_1":{"cf":"rowkey", "col":"hbase_key_column", "type":"string"},
         |"your_df_column_2":{"cf":"hbase_col_family1", "col":"hbase_column_1", "type":"string"}
         |"your_df_column_3":{"cf":"hbase_col_family1", "col":"hbase_column_2", "type":"string"}
         |"your_df_column_4":{"cf":"hbase_col_family2", "col":"hbase_column_3", "type":"string"}
         |}
         |}""".stripMargin

I have tried to make the catalog as verbose and as human readable as possible. Hope this is helpful for you whenever you are trying to define one for your table. I had struggled a bit initially while defining catalog for my tables.

2. Write to HBase

Given a DataFrame with specified schema, above will create an HBase table with 5 regions and save the DataFrame inside. Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.

sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()

3. Read from HBase

This is how you can read a HBase table provided a well defined catalog. rest I guess you guys are expert in spark and can handle from the point where you have a dataframe/dataset at your disposal.

val df = sqlContext
         .read
         .options(Map(HBaseTableCatalog.tableCatalog->catalog))
         .format("org.apache.spark.sql.execution.datasources.hbase")
         .load()

Common Issues -

If you follow through steps till this and everything runs perfectly then you can ignore this part of the post. If not then let's see what are the most common problems we face while using this connector.

1. java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration

Solution - You have not provided hbase libs to your spark job, or you have misconfigured the hbase lib paths. All you need to do is pass hbase lib path to your spark job like this,

--driver-java-options -Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/lib/* or

--jars /your_hbase_lib/hbase-common.jar,/your_hbase_lib/protobuf-java.jar,/your_hbase_lib/guava.jar,/your_hbase_lib/zookeeper.jar,/your_hbase_lib/hbase-protocol.jar,/your_hbase_lib/hbase-server.jar

2. java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.execution.datasources.hbase

Solution - There are two possible scenarios here, one is you are not passing the shc-core jar to your spark job. This one is pretty straight forward pass the shc-core jar to your job and you will get rid of this error.

Where as second scenario is a bit tricky, what might be happening is there is a version mismatch between some of the components like spark, hbase or hadoop in your shc connector and your environment which was the case with my connector. there was no specific available versions of shc-core with our hadoop stack. In this case you have to build a custom uber shc-core jar by changing the component versions in the shc pom file. once you do that and build the jar using

mvn clean install --DskipTests and use that custom jar for your application. this works like a charm.

3. java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;

Solution - This is a typical multiple versions of same dependency issue specifcally posed by jackson dependency.

Run the mvn dependency:tree command and you will see multiple versions of org.json4s json4s-jackson_2.11 3.2.11. Keep the latest and exclude the older version from other package.

4. Exception in thread "main" org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:

Solution - This is again one of the common issues that you might end up with. The connection issues happen due to wrong configuration for hbase. The easiest way to fix this issue would be to pass the hbase-site.xml to your application through --files option in your spark command.

And just like that you can connect to your Hbase and start making some dents in your small universe and eventually get your Spark applications connected to HBase.

Until then, Keep Learning and Keep Growing!