How to Integrate Your Stuff into Spark SQL
Spark SQL is a module in Apache Spark that enables relational processing (e.g., declarative queries) using Spark’s functional programming API. Spark SQL also provides a declarative DataFrame API to bridge between relational and procedural processing. It supports both external data sources (e.g., JSON, Parquet and Avro) and internal data collections (i.e., RDDs). Besides, it uses a highly extensible optimizer Catalyst, making it easy to add complex rules, control code generation, and define extension points.
In this post, I am going to show how to integrate your algorithms into Spark SQL, enabling SQL and DataFrame API for your self-made analytics. I am going to use Spark 2.2.0 wit Scala 2.11, which is the latest version for now, with IntelliJ IDEA as the IDE (the best IDE for Java/Scala!).
0. Preparation
0.1 Download the Source Code
Please download the source code for Spark 2.2.0, which I get from their mirror repository on GitHub.
0.2 Configure Everything
Since we use IntelliJ for development, you can consult the official guide for setuping up the IDE. Besides, you should do the following things:
- Install the ANTLR v4 plug-in for IntelliJ which will be used in Section 1.
- Go to View > Tool Windows > Maven Projects and add
hadoop-2.6
,hive-provided
,hive-thriftserver
,yarn
inProfiles
(there are some default profiles as well, don’t change them). Then Reimport All Maven Projects (the first button on upper-right corner), Generate Sources and Update Folders For All Projects (the second button on upper-right corner). - Rebuild the whole project, which would fail but is essential for following steps.
- Marking Generated Sources:
- Go to File > Project Structure > Project Settings > Modules. Find
spark-streaming-flume-sink
, and marktarget/scala-2.11/src_managed/main/compiled_avro
as source. (Click on the Sources on the top to mark) - Go to File > Project Structure > Project Settings > Modules. Find
spark-hive-thriftserver
, and marksrc/gen/java
as source. (Click on the Sources on the top to mark)
- Go to File > Project Structure > Project Settings > Modules. Find
- Rebuild the whole project again, which should work well now. If there still exist some compilation errors for not finding some classes, you may return to last step and marking corresponding sources if not included.
0.3 Our Goal
Actually what I am going to do is integrating our distributed trajectory analytics systems into Spark SQL, which was recently accepted as a full paper by SIGMOD 2018. In general, we support distributed similarity-based trajectory search/join with SQL and DataFrame API.
For example, if we have a table $T(id: Long, traj: trajectory)$, we could perform a self-join on it with the following SQL command
or with the following DataFrame API
where DTW (Dynamic Time Warping) is one of the algorithms for measuring similarity between two temporal sequences, which may vary in speed. In our paper, we also support Fréchet distance, LCSS and EDR.
In the following sections, I will show how to support the above two syntaxes in Spark SQL.
1. Supporting SQL Syntax
1.1 Lexical Analysis
Spark 2.2.0 utilizes ANTLR 4 for its SQL Syntax functionality (for both lexical and syntax analysis), which is defined in sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
.
Since we want to extend the predicate following ON
keyword in the join command, we should first look at the JoinCriteria
rule. We find ON
is followed by another rule booleanExpression
. Repeating this process for find the correct clause, we will follow booleanExpression
-> predicated
-> valueExpression
-> primaryExpression
. Thus, we will add the rule for DTW(t1.traj, t2.traj)
in primaryExpression
(or you can add it in valueExpression
, which is also fine).
We will add one row for the rules of primaryExpression
, which is
Here we add another sub-rule trajectorySimilarityFunction
, which could be DTW
, Frechet
, LCSS
or EDR
.
Now we’ve added all the rules for lexical analysis, we should Generate Sources and Update Folders For All Projects (as mentioned in Section 0.2) to make ANTLR generate Java code for the new lexer.
1.2 Syntax analysis
The syntax analysis in Spark SQL will build a AST-tree as the initial logical plan, in which the “visitor” mode is used. In other words, we will visit the corresponding lexical tree node for TrajectorySimilarity
and update it with syntactic information. In other words, we will generate a new tree node, as shown in the following code snippet
We extend the base class BinaryExpression
which is used for expressions with two inputs (traj1
and traj2
in our setting), and CodegenFallback
will bypass the code generation process for us. We put it under the directory sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions
.
Then we will add a new method in AstBuilder
which converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or TableIdentifier. This method will visit the TrajectorySimilarity
node in the ANTLR4 ParseTree and transform it to the above-created expression TrajectorySimilarityExpression
.
After these steps, we are now able to support executing SQL queries.
2. Supporting DataFrame API
It is pretty trivial to support DataFrame API now. All you need to do is adding a new method in Dataset
.
This method generates the same logical plan as we did in Section 1. Suppose we have two DataFrames df1
and df2
, we can call trajSimJoin
now:
3. Self-defined Execution
Although we’ve supported both SQL and DataFrame API in the front-end, the back-end execution is just a built-in Cartesian product join. To support the execution of our own similarity join algorithm, we need to go further.
For the back-end execution, we are given the logical plan either from the syntax analysis of SQL or DataFrame API. Then we are supposed to transform the logical plan to the corresponding physical plan for actual execution. But before transforming, let’s first construct the physical plan for our own join algorithm.
3.1 Constructing Physical Plan
The prototype of the physical plan is similar to that of the logical plan since they refer to the same thing. We just need to modify the doExecute()
method to implement our own algorithm.
3.2 Transforming Logical Plan to Physical Plan
This part is defined in SparkStrategies.scala, since we only focus on join, we are going to only modify the apply
method of JoinSelection
. Since we only care about the trajectory similarity join, we add another case
here to handle it.
So what is ExtractTrajectorySimilarityJoin
? It utilizes an interesting feature called extractor
in Scala, which transforms plan
to (joinType, leftKey, rightKey, function, threshold, left, right)
here. So it is trivial to implement it to handle trajectory similarity join as followings. Please note that if the pattern does not match (case _
here), we should return None
.
4. Test
We could create test examples in the examples
submodule of Spark to make sure our modifications work. Please note that the scopes of some dependencies should be marked as compile
to avoid missing class error at runtime. I put my profile here for you to add into examples/pom.xml
and enable it in IntelliJ.
5. References
I will release the paper and code soon.