How to Integrate Your Stuff into Spark SQL

Spark SQL is a module in Apache Spark that enables relational processing (e.g., declarative queries) using Spark’s functional programming API. Spark SQL also provides a declarative DataFrame API to bridge between relational and procedural processing. It supports both external data sources (e.g., JSON, Parquet and Avro) and internal data collections (i.e., RDDs). Besides, it uses a highly extensible optimizer Catalyst, making it easy to add complex rules, control code generation, and define extension points.

In this post, I am going to show how to integrate your algorithms into Spark SQL, enabling SQL and DataFrame API for your self-made analytics. I am going to use Spark 2.2.0 wit Scala 2.11, which is the latest version for now, with IntelliJ IDEA as the IDE (the best IDE for Java/Scala!).

0. Preparation

0.1 Download the Source Code

Please download the source code for Spark 2.2.0, which I get from their mirror repository on GitHub.

0.2 Configure Everything

Since we use IntelliJ for development, you can consult the official guide for setuping up the IDE. Besides, you should do the following things:

  • Install the ANTLR v4 plug-in for IntelliJ which will be used in Section 1.
  • Go to View > Tool Windows > Maven Projects and add hadoop-2.6, hive-provided, hive-thriftserver, yarn in Profiles (there are some default profiles as well, don’t change them). Then Reimport All Maven Projects (the first button on upper-right corner), Generate Sources and Update Folders For All Projects (the second button on upper-right corner).
  • Rebuild the whole project, which would fail but is essential for following steps.
  • Marking Generated Sources:
    • Go to File > Project Structure > Project Settings > Modules. Find spark-streaming-flume-sink, and mark target/scala-2.11/src_managed/main/compiled_avro as source. (Click on the Sources on the top to mark)
    • Go to File > Project Structure > Project Settings > Modules. Find spark-hive-thriftserver, and mark src/gen/java as source. (Click on the Sources on the top to mark)
  • Rebuild the whole project again, which should work well now. If there still exist some compilation errors for not finding some classes, you may return to last step and marking corresponding sources if not included.

0.3 Our Goal

Actually what I am going to do is integrating our distributed trajectory analytics systems into Spark SQL, which was recently accepted as a full paper by SIGMOD 2018. In general, we support distributed similarity-based trajectory search/join with SQL and DataFrame API.

For example, if we have a table $T(id: Long, traj: trajectory)$, we could perform a self-join on it with the following SQL command

SELECT * FROM T t1 JOIN T t2 ON DTW(t1.traj, t2.traj) <= 0.005

or with the following DataFrame API

t.traj_join(t2, DTW)

where DTW (Dynamic Time Warping) is one of the algorithms for measuring similarity between two temporal sequences, which may vary in speed. In our paper, we also support Fréchet distance, LCSS and EDR.

In the following sections, I will show how to support the above two syntaxes in Spark SQL.

1. Supporting SQL Syntax

1.1 Lexical Analysis

Spark 2.2.0 utilizes ANTLR 4 for its SQL Syntax functionality (for both lexical and syntax analysis), which is defined in sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4.

Since we want to extend the predicate following ON keyword in the join command, we should first look at the JoinCriteria rule. We find ON is followed by another rule booleanExpression. Repeating this process for find the correct clause, we will follow booleanExpression -> predicated -> valueExpression -> primaryExpression. Thus, we will add the rule for DTW(t1.traj, t2.traj) in primaryExpression (or you can add it in valueExpression, which is also fine).

We will add one row for the rules of primaryExpression, which is

| function=trajectorySimilarityFunction '(' left=primaryExpression ',' right=primaryExpression ')'  #trajectorySimilarity

Here we add another sub-rule trajectorySimilarityFunction, which could be DTW, Frechet, LCSS or EDR.

// Trajectory Similarity Functions
trajectorySimilarityFunction
    : DTW
    | FRECHET
    | EDR
    | LCSS
    ;

DTW: 'DTW';
FRECHET: 'FRECHET';
EDR: 'EDR';
LCSS: 'LCSS';

Now we’ve added all the rules for lexical analysis, we should Generate Sources and Update Folders For All Projects (as mentioned in Section 0.2) to make ANTLR generate Java code for the new lexer.

1.2 Syntax analysis

The syntax analysis in Spark SQL will build a AST-tree as the initial logical plan, in which the “visitor” mode is used. In other words, we will visit the corresponding lexical tree node for TrajectorySimilarity and update it with syntactic information. In other words, we will generate a new tree node, as shown in the following code snippet

case class TrajectorySimilarityFunction(function: trajectorySimilarityFunction, traj1: Expression, traj2: Expression)
    extends BinaryExpression with CodegenFallback {

    override def left: Expression = traj1
    override def right: Expression = traj2

    override def dataType: DataType = DoubleType

    override def nullSafeEval(traj1: Any, traj2: Any): Any = function match {
        case DTW =>
        TrajectorySimilarity.DTW.evalWithTrajectory(traj1, traj2)
    }
}

We extend the base class BinaryExpression which is used for expressions with two inputs (traj1 and traj2 in our setting), and CodegenFallback will bypass the code generation process for us. We put it under the directory sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions.

Then we will add a new method in AstBuilder which converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or TableIdentifier. This method will visit the TrajectorySimilarity node in the ANTLR4 ParseTree and transform it to the above-created expression TrajectorySimilarityExpression.

override def visitTrajectorySimilarity(ctx: TrajectorySimilarityContext): Expression = withOrigin(ctx) {
    ctx.function match {
        case function if function.DTW != null => TrajectorySimilarityExpression(DTW, expression(ctx.left), expression(ctx.right))
    }
}

After these steps, we are now able to support executing SQL queries.

2. Supporting DataFrame API

It is pretty trivial to support DataFrame API now. All you need to do is adding a new method in Dataset.

def trajSimJoin(right: Dataset[_], leftKey: Column, rightKey: Column,
                function: TrajectorySimilarityFunction, threshold: Double): DataFrame = withPlan {
    val trajectorySimilarityExpression = TrajectorySimilarityExpression(function, leftKey.expr, rightKey.expr)
    val condition = LessThanOrEqual(trajectorySimilarityExpression, Literal(threshold))
    Join(logicalPlan, right.logicalPlan, joinType = Inner, Some(condition))
}

This method generates the same logical plan as we did in Section 1. Suppose we have two DataFrames df1 and df2, we can call trajSimJoin now:

df1.trajSimJoin(df2, df1("traj"), df2("traj"), DTW, 0.005).show()

3. Self-defined Execution

Although we’ve supported both SQL and DataFrame API in the front-end, the back-end execution is just a built-in Cartesian product join. To support the execution of our own similarity join algorithm, we need to go further.

For the back-end execution, we are given the logical plan either from the syntax analysis of SQL or DataFrame API. Then we are supposed to transform the logical plan to the corresponding physical plan for actual execution. But before transforming, let’s first construct the physical plan for our own join algorithm.

3.1 Constructing Physical Plan

The prototype of the physical plan is similar to that of the logical plan since they refer to the same thing. We just need to modify the doExecute() method to implement our own algorithm.

case class TrajectorySimilarityJoinExec(leftKey: Expression, rightKey: Expression,
                                        function: TrajectorySimilarityFunction, threshold: Literal,
                                        left: SparkPlan, right: SparkPlan) extends BinaryExecNode {
  override def output: Seq[Attribute] = left.output ++ right.output

  protected override def doExecute(): RDD[InternalRow] = {
    val leftRDD = left.execute().map(row => toTrajectory(row))
    val rightRDD = right.execute().map(row => toTrajectory(row))

    // Our Algorithm Here
  }
}

3.2 Transforming Logical Plan to Physical Plan

This part is defined in SparkStrategies.scala, since we only focus on join, we are going to only modify the apply method of JoinSelection. Since we only care about the trajectory similarity join, we add another case here to handle it.

plan match {
    case ExtractTrajectorySimilarityJoin(joinType, leftKey, rightKey, function, threshold, left, right) =>
            TrajectorySimilarityJoinExec(leftKey, rightKey, function, threshold, planLater(left), planLater(right)) :: Nil
    ...

So what is ExtractTrajectorySimilarityJoin? It utilizes an interesting feature called extractor in Scala, which transforms plan to (joinType, leftKey, rightKey, function, threshold, left, right) here. So it is trivial to implement it to handle trajectory similarity join as followings. Please note that if the pattern does not match (case _ here), we should return None.

object ExtractTrajectorySimilarityJoin extends Logging with PredicateHelper {
  type ReturnType =
    (JoinType, Expression, Expression, TrajectorySimilarityFunction, Literal, LogicalPlan, LogicalPlan)

  def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
    case logical.Join(left, right, joinType, condition) =>
      logDebug(s"Considering join on: $condition")
      if (condition.isDefined) {
        condition.get match {
          case LessThanOrEqual(TrajectorySimilarityExpression(function, traj1, traj2), threshold) =>
            Some((joinType, traj1, traj2, function, threshold.asInstanceOf[Literal], left, right))
          case _ => None
        }
      } else {
        None
      }
    case _ => None
  }
}

4. Test

We could create test examples in the examples submodule of Spark to make sure our modifications work. Please note that the scopes of some dependencies should be marked as compile to avoid missing class error at runtime. I put my profile here for you to add into examples/pom.xml and enable it in IntelliJ.

<profile>
    <id>dita</id>
    <properties>
    <hadoop.deps.scope>compile</hadoop.deps.scope>
    <parquet.deps.scope>compile</parquet.deps.scope>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>14.0.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-http</artifactId>
            <version>${jetty.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-continuation</artifactId>
            <version>${jetty.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-servlet</artifactId>
            <version>${jetty.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-util</artifactId>
            <version>${jetty.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-security</artifactId>
            <version>${jetty.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-plus</artifactId>
            <version>${jetty.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-server</artifactId>
            <version>${jetty.version}</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</profile>

5. References

I will release the paper and code soon.

Written on November 7, 2017