An Architecture for Compiling UDF-centric Workflows

Reference: Crotty, Andrew, et al. "An architecture for compiling udf-centric workflows." Proceedings of the VLDB Endowment 8.12 (2015): 1466-1477.

0. Abstract

Data analytics has recently grown to include increasingly sophisticated techniques, such as machine learning and advanced statistics. Users frequently express these complex analytics tasks as workflows of user-defined functions (UDFs) that specify each algorithmic step. However, given typical hardware configurations and dataset sizes, the core challenge of complex analytics is no longer sheer data volume but rather the computation itself, and the next generation of analytics frameworks must focus on optimizing for this computation bottleneck. While query compilation has gained widespread popularity as a way to tackle the computation bottleneck for traditional SQL workloads, relatively little work addresses UDF-centric workflows in the domain of complex analytics.

In this paper, we describe a novel architecture for automatically compiling workflows of UDFs. We also propose several optimizations that consider properties of the data, UDFs, and hardware together in order to generate different code on a case-by-case basis. To evaluate our approach, we implemented these techniques in TUPLEWARE, a new high-performance distributed analytics system, and our benchmarks show performance improvements of up to three orders of magnitude compared to alternative systems.

1. Introduction

  • We argue that UDFs must exist at the core of the optimization process and should no longer be treated as black boxes.


  • We present a novel architecture that leverages LLVM for compiling UDF-centric workflows into distributed programs.
  • We propose several code generation optimizations that consider properties of the data, UDFs, and underlying hardware together.
  • We describe a programming model with explicit shared state and semantics that enable low-level code generation optimizations.
  • We implemented our techniques in TUPLEWARE, and our benchmarks show speedups of up to three orders of magnitude over other systems for common analytics tasks.

2. Overview

  • The UDF Analyzer introspects each UDF by examining the LLVM IR to gather statistics for predicting execution behavior.
  • The Optimizer translates the workflow graph into a distributed program by generating execution code with embedded references to the associated UDFs. During code generation, the Optimizer uses the UDF statistics to apply low-level optimizations that specifically target the underlying hardware.
  • The Linker then merges the LLVM IR for the UDFs with the generated execution code, and the distributed program is then deployed for execution on the cluster.

3. Frontend

3.1 Low-Level Optimizability

  • Low level optimizability for different categories of operators: Apply, Aggregate, Relational, Control.

3.2 Explicit Shared State

  • Shared state is an essential component of complex analytics tasks, but prior attempts to add distributed shared state to existing frameworks restrict how and when UDFs can interact with global variables.
  • We overcome these limitations by providing three different update patterns for reduce UDFs that use Context aggregation variables: (1) parallel (conflicting updates must be commutative and associative); (2) synchronous (exclusive locks prevent conflicting updates); (3) asynchronous (the algorithm must ensure correctness).

3.3 LLVM Foundation

  • Our architecture leverages the LLVM compiler framework to make our Frontend language-agnostic, allowing users to compose workflows of UDFs in a variety of programming languages (e.g., C/C++, Python, Julia, R) with minimal overhead.
  • LLVM also enables UDF introspection to provide certain correctness guarantees at compile time.
  • Our Optimizer can leverage UDF statistics from the LLVM IR to generate better code.

4. Compilation

4.1 UDF Analyzer

The UDF Analyzer examines the LLVM IR of each UDF to determine several features, including: (1) vectorizability (using SIMD); (2) computation cycle estimates; (3) load time (by checking the umber of cycles necessary to fetch UDF operands from memory, we can know it is compute-bound or memory-bound).

4.2 Optimizer

  • High-Level: We utilize well-known query optimization techniques, including predicate pushdown and join reordering.
  • Low-Level: We don’t have associated overheads from interpreted execution models, Volcano-style iterators, or remote procedure calls, and gain many compiler optimizations (e.g., inline expansion, SIMD vectorization) “for free” by compiling workflows.
  • Combined: our Optimizer considers (1) high-level algebra semantics, (2) metadata, and (3) low-level UDF statistics together in order to generate different code on a case-by-case basis.

4.3 Linker

After translating the workflow to a distributed program, the generated code has several embedded references to the supplied UDFs. This code then needs to be merged with the LLVM IR for each referenced UDF. The Linker performs the merging process, using an LLVM pass to combine them with in-line expansion.

5. Optimizations

5.1 Program Structure

5.1.1 Existing Strategies


  • The pipeline strategy maximizes data locality by performing as many sequential operations as possible per tuple. Operations called pipeline breakers force the materialization of intermediate results.
  • The pipeline strategy has the major advantage of requiring only a single pass through the data. Additionally, a tuple is likely to remain in the CPU registers for the duration of processing, resulting in excellent data locality.


  • In contrast, the operator-at-a-time strategy (MonetDB) performs a single operation at a time for all tuples. This bulk processing approach maximizes instruction locality and opportunities for SIMD vectorization.
  • However, the operator-at-a-time strategy requires materialization of intermediate results between each operator, resulting in poor data locality. A tiled variant (Vectorwise) performs each operation on a cache-resident subset of the data, thus reducing materialization costs and saving memory bandwidth, but does not achieve the same level of data locality as the pipeline strategy.

5.1.2 Hybrid Strategy

  • Our strategy first groups all operators into a single pipeline P in order to maximize data locality. Next, for each operator O in P , we leverage the UDF statistics gathered by the UDF Analyzer in order to partition P into a set of vectorizable and nonvectorizable sub-pipelines P’. Intermediate results are materialized between sub-pipelines in cache-resident blocks to reduce the amount of data transferred from memory to the CPU. By the end of the algorithm, all sub-pipelines should be composed uniformly of either vectorizable or nonvectorizable UDFs.
  • If the scalar version is already memory-bound, then the vectorizable sub-pipeline should be merged with the adjacent nonvectorizable sub-pipeline in order to benefit from data locality, since no additional performance increase can be achieved with SIMD vectorization.
  • Heuristic 1a: an operator pipeline should always be partitioned into vectorizable and nonvectorizable sub-pipelines, unless the first operator is memory-bound.

5.2 Aggregation

5.2.1 Group-by

  • HEURISTIC 1b: All group-by reduce operations should be decomposed into two 1-to-1 map operations (the key/hash functions) followed by the aggregation and then optimized using Heuristic 1a.

5.2.2 Context Variables

  • HEURISTIC 2: All references to Context variables inside UDFs should be replaced with static memory locations at compile time by mapping distinct keys to physical address offsets.

5.3 Selection

5.3.1 Predicate Evaluation


  • For each input tuple, a conditional statement checks to see whether that tuple satisfies the predicate. If the predicate is satisfied, then the tuple is added to a result buffer; otherwise, the loop skips the tuple and proceeds to the next tuple.
  • This strategy performs well for both very low and high selectivities, when the CPU can perform effective branch prediction. For intermediate selectivities (i.e., closer to 50%), though, branch mis-prediction penalties have a severe negative impact on performance.


  • This approach maintains a pointer to the current location in the result buffer that is incremented every time an input tuple satisfies the predicate. If a tuple does not satisfy the predicate, then the pointer is not incremented and the previous value is overwritten.
  • This strategy includes no conditional statements, which yields better performance than the branch strategy for intermediate selectivities by avoiding CPU branch mis-predictions.


  • With this technique, predicate evaluation can be partially vectorized because there are no data dependencies in the testing step. Additionally, the resulting code contains tighter loops, thus improving instruction locality.

5.3.2 Result Allocation

  • Tuple-at-a-time: The most conservative approach to result allocation is to allocate space for only a single output tuple each time an input tuple satisfies the predicate. Tuple-at-a-time allocation minimizes the amount of wasted space, but the overhead associated with allocating in such small increments quickly becomes prohibitive for even relatively small data sizes.
  • Max: The other extreme would assume a worst-case scenario and allocate all possible necessary space, thereby paying a larger allocation penalty once at the beginning to completely avoid result bounds checking. This approach may work well for very high selectivities but wastes a lot of space for low selectivity cases.
  • Block: The block allocation strategy is a compromise between the tuple-at-a-time and max strategies. This approach incrementally allocates space for blocks of tuples (e.g., 1024 tuples at a time) in order to balance the required number of allocations and the amount of wasted space.
  • Exact: We can adapt the prepass strategy described in Section 5.3.1 to also maintain a simple counter when computing the bitmap values, allowing us to generate code that only performs bounds checking if the result buffer could overflow.

5.3.3 Cost Model

  • HEURISTIC 3: For all selection operations, choose the combination of evaluation strategy, result strategy, and block size that minimizes the cost for the given parameters (Equation 2).

6. Execution

Load Balancing

  • Our data request model is multitiered and pull- based, allowing for automatic load balancing with minimal overhead.
  • We dedicate a single thread on a single node in the cluster as the Global Manager (GM), which is responsible for global decisions such as the coarse-grained partitioning of data across nodes and supervising the current stage of the execution.
  • we dedicate one thread per node as a Local Manager (LM). The LM is responsible for fine-grained management of the local shared memory, as well as transferring data between nodes. The LM also spawns new Executor (E) threads for running compiled workflows. These threads request data in small cache-sized blocks from the LM, and each LM in turn requests larger blocks of data from the GM, possibly from remote nodes.
  • All remote data requests occur asynchronously, and blocks are requested in advance to mask network transfer latency.

Memory Management

  • The LM is responsible for tracking all active T-Sets and performing garbage collection when necessary. UDFs that allocate their own memory, though, are not managed by the LM’s garbage collector.
  • TUPLEWARE also avoids unnecessary object creations and data copying (e.g., performing updates in-place if the data is not required in subsequent computations).
  • The LM can reorganize and compact the data while idle, potentially even performing on-the-fly compression.

Fault Tolerance

  • TUPLEWARE further improves performance by forgoing traditional fault tolerance mechanisms for short-lived jobs, where the probability of a failure is low and results are easy to fully recompute.
  • Extremely long-running jobs on the order of hours or days, though, might benefit from intermediate result recoverability. In these cases, TUPLEWARE can perform simple k-safe checkpoint replication.

7. Evaluation

7.1 Distributed Benchmarks

7.1.1 Workloads and Data

  • K-means, PageRank, Logistic Regression, Naive Bayes

7.1.2 Discussion

  • TUPLEWARE outperforms Hadoop by up to three orders of magnitude and Spark by up to two orders of magnitude for the tested ML tasks.

7.2 Single Node Benchmarks

7.2.1 Workloads and Data

  • All four ML tasks
  • TPC-H(Q1, Q4, Q6)

7.2.2 Discussion

  • TUPLEWARE outperforms Spark, HyPer, and MonetDB for all ML tasks.
  • The DBMSs can execute the TPC-H queries very efficiently, while TUPLEWARE can achieve better performance than both HyPer and MonetDB on Q6.
  • TUPLEWARE achieves about an order-of-magnitude speedup over Spark for all workloads.

7.3 Performance Breakdown

7.4 Microbenchmarks

  • Programming Model: DryadLINQ, Scope, Spark, Stratosphere
  • Code Generation
  • Single Node Framework
Written on November 15, 2017