Bulk Loading in HBase with Practice in MR & Spark
Bulk loading is a feature of HBase for ingesting tons of data efficiently. In this post, I are going to share some basic concepts of bulk loading and its practice in MapReduce and Spark.
1. Overview of Bulk Loading
I highly recommend you read this post from Cloudera first, since it illustrates almost everything about bulk loading in HBase. Then I will provide some supplementary knowledge here.
Why is bulk loading efficient? When we perform a normal Put
or Delete
on HBase, HBase would first log this operation in the Write Ahead Log and store this entry into the MemStore. Whenever there are enough entries in the Memstore, the RegionServer would flush it to local disk in the format of HFile(a.k.a, LSM Tree). For bulk loading, it will generate and load HFiles directly thus bypassing the write path. See more details about HBase Write Path.
2. Practice in MapReduce
Let’s look at the following figure first to have a better understanding of the undergoings of the ETL process of bulk loading.
As this this post suggests, we need to write Mapper ourselves, which is supposed to emit the Row Key
as key, and either a KeyValue
, a Put
, or a Delete
as value. Reducer is handled by HBase.
The following snippet configures the MapReduce job. The variable bulkLoadPath
denotes the path where the generated HFiles locate and tableName
is the name of the HBase table.
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
FileOutputFormat.setOutputPath(configuration, new Path(bulkLoadPath));
HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration, tableName));
Further, this snippet shows how to load these HFiles into HBase after finishing this MapReduct job.
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);
loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);
3. Practice in Spark
Things for Spark tend to be a little tough here. Since there has been no official support for direct bulk loading in Spark as far as I know, here is a really helpful library HBaseRDD shedding light on this issue. Moreover, I will try to illustrate these methods in a systematic way.
It is time to look at this figure again, and now assume that we have finished the EXTRACT phase and get a RDD.
However, now we don’t have a MapReduce job to do everything for us. Why not simulate it? For Mapper, we just need to generate the corresponding format, say, (Row Key
, KeyValue
(or Put
/ Delete
); for Reducer, it does the followings:
- Inspects the table to configure a total order partitioner
- Uploads the partitions file to the cluster and adds it to the DistributedCache
- Sets the number of reduce tasks to match the current number of regions
- Sets the output key/value class to match HFileOutputFormat’s requirements
- Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer)
Thus, it is clear that we need to partition the key-valued data into different regions and sort them. It seems easy because we know there is a built-in function repartitionAndSortWithinPartitions
for RDD in Spark if we provide a partitioner. It is important to note that we CANNOT transform RDD to HBase-format data before this process, which would cause serialization error since KeyValue
(or Put
/Delete
) is not serializable for repartitioning. Look at this following snippet, the function toKeyValue
denotes transformation to HBase-format data and partitioner
is the Partitioner
class we will define below.
rdd
.repartitionAndSortWithinPartitions(partitioner)
.map {toKeyValue}
Then we need a partitioner now. What we only need to do is using RegionLocator
from HBase to handle this. I believe this code snippet is easy to comprehend.
protected abstract class HFilePartitioner extends Partitioner {
def extractKey(n: Any) = n match {
case (k: Array[Byte], _) => k
case ((k: Array[Byte], _), _) => k
}
}
private class SingleHFilePartitioner(splits: Array[Array[Byte]]) extends HFilePartitioner {
override def getPartition(key: Any): Int = {
val k = extractKey(key)
for (i <- 1 until splits.length)
if (Bytes.compareTo(k, splits(i)) < 0) return i - 1
splits.length - 1
}
override def numPartitions: Int = splits.length
}
protected def getPartitioner(conf: Configuration, regionLocator: RegionLocator) =
HFilePartitioner(conf, regionLocator.getStartKeys)
After that, you can save this RDD as a HFile directly use the built-in function saveAsNewAPIHadoopFile
just like this.
rdd
.saveAsNewAPIHadoopFile(hFilePath.toString, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
At last, it is the same as above to load the HFiles into HBase. I have written a simple single file providing bulk loading for HBase in Spark. Feel free to use it here.