Bulk loading is a feature of HBase for ingesting tons of data efficiently. In this post, I are going to share some basic concepts of bulk loading and its practice in MapReduce and Spark.

I highly recommend you read this post from Cloudera first, since it illustrates almost everything about bulk loading in HBase. Then I will provide some supplementary knowledge here.

Why is bulk loading efficient? When we perform a normal Put or Delete on HBase, HBase would first log this operation in the Write Ahead Log and store this entry into the MemStore. Whenever there are enough entries in the Memstore, the RegionServer would flush it to local disk in the format of HFile(a.k.a, LSM Tree). For bulk loading, it will generate and load HFiles directly thus bypassing the write path. See more details about HBase Write Path.

## 2. Practice in MapReduce

Let’s look at the following figure first to have a better understanding of the undergoings of the ETL process of bulk loading.

As this this post suggests, we need to write Mapper ourselves, which is supposed to emit the Row Key as key, and either a KeyValue, a Put, or a Delete as value. Reducer is handled by HBase.

The following snippet configures the MapReduce job. The variable bulkLoadPath denotes the path where the generated HFiles locate and tableName is the name of the HBase table.

job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);


Further, this snippet shows how to load these HFiles into HBase after finishing this MapReduct job.

LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);


## 3. Practice in Spark

Things for Spark tend to be a little tough here. Since there has been no official support for direct bulk loading in Spark as far as I know, here is a really helpful library HBaseRDD shedding light on this issue. Moreover, I will try to illustrate these methods in a systematic way.

It is time to look at this figure again, and now assume that we have finished the EXTRACT phase and get a RDD.

However, now we don’t have a MapReduce job to do everything for us. Why not simulate it? For Mapper, we just need to generate the corresponding format, say, (Row Key, KeyValue(or Put/ Delete); for Reducer, it does the followings:

• Inspects the table to configure a total order partitioner
• Uploads the partitions file to the cluster and adds it to the DistributedCache
• Sets the number of reduce tasks to match the current number of regions
• Sets the output key/value class to match HFileOutputFormat’s requirements
• Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer)

Thus, it is clear that we need to partition the key-valued data into different regions and sort them. It seems easy because we know there is a built-in function repartitionAndSortWithinPartitions for RDD in Spark if we provide a partitioner. It is important to note that we CANNOT transform RDD to HBase-format data before this process, which would cause serialization error since KeyValue(or Put/Delete) is not serializable for repartitioning. Look at this following snippet, the function toKeyValue denotes transformation to HBase-format data and partitioner is the Partitioner class we will define below.

rdd
.repartitionAndSortWithinPartitions(partitioner)
.map {toKeyValue}


Then we need a partitioner now. What we only need to do is using RegionLocator from HBase to handle this. I believe this code snippet is easy to comprehend.

protected abstract class HFilePartitioner extends Partitioner {
def extractKey(n: Any) = n match {
case (k: Array[Byte], _) => k
case ((k: Array[Byte], _), _) => k
}
}

private class SingleHFilePartitioner(splits: Array[Array[Byte]]) extends HFilePartitioner {
override def getPartition(key: Any): Int = {
val k = extractKey(key)
for (i <- 1 until splits.length)
if (Bytes.compareTo(k, splits(i)) < 0) return i - 1

splits.length - 1
}

override def numPartitions: Int = splits.length
}

protected def getPartitioner(conf: Configuration, regionLocator: RegionLocator) =
HFilePartitioner(conf, regionLocator.getStartKeys)


After that, you can save this RDD as a HFile directly use the built-in function saveAsNewAPIHadoopFile just like this.

rdd