Improve Apache Spark aggregate performance with batching

Improve Apache Spark aggregate performance with batching

Seahorse provides users with reports on their data at every step in the workflow. A user can view reports after each operation to review the intermediate results. In our reports we provide users with distributions for columns in the form of a histogram for continuous data, and a pie chart for categorical data. In order to calculate data reports Seahorse needs to execute many independent aggregate operations for each data column. We managed to significantly improve aggregate performance by batching those operations together. That way those aggregations can be executed in just one data-pass.

categorical

Categorical distribution

 

transform_report-1

DataFrame report. Users are able to view distributions of their data after clicking on the chart icon.

 

Data Reports in Seahorse

Reports are generated for each DataFrame after every Operation. This means that performance is crucial here.

In our first approach, we used Apache Spark’s built-in histogram on RDD[Double] feature. We would initially map our RDD[Row] to RDD[Double] for each Double column and call histogram for each one.

Unfortunately this approach is very time-intensive. Calculating histograms for each column independently meant that we were performing expensive, full-data passes per each column.

Another alternative would be to introduce a form of special multi-histogram operation which would operate on multiple columns. Apache Spark already does that for column statistics – there is a Multicolumn Statistics method that calculates column statistics for each column in only one data pass (MultivariateStatisticalSummary).

This approach was not good enough for us. We would like to have a generic solution to combine arbitrary operations together instead of writing custom multi-operations.

Abstracting over aggregator

An operation producing one value out of a whole dataset is called aggregation. In order to perform an aggregation, the user must provide three components:

  • mergeValue function – aggregates results from a single partition
  • mergeCombiners function – merges aggregated results from the partitions
  • initialElement – the initial aggregator value.

With these three pieces of data, Apache Spark is able to aggregate data across each partition and then combine these partial results together to produce final value.

We introduced an abstract Aggregator encapsulating the mergeValue, mergeCombiners and, initialElement properties. Thanks to this abstraction we can create Wrappers for our aggregators with additional behaviors.

MultiAggregator is one such wrapper. It wraps a sequence of aggregators and executes them in a batch – in a single data-pass. Its implementation involves:

  • mergeValue – aggregates results from a single partition for each aggregator
  • mergeCombiners – merges aggregated results from the partitions for each aggregator
  • initialElement – initial aggregator values for each aggregator

This special aggregator wraps a sequence of aggregators and then uses their mergeValue, mergeCombiners and initialElement functions to perform aggregation in one batch.

Reading Results in a Type-safe Manner

Each aggregator can return an arbitrary value type. In order to maintain type safety and genericity we added a special class encapsulating results for all input aggregators. In order to get specific aggregator result, we pass initial aggregator object to batched result. That way result type is correctly inferred from the aggregator’s result type.

As we can see, each value type is derived from the aggregator’s return type.

Measurements and comparing aggregate performace

Let’s measure time for calculating histograms for data rows made up of 5 numeric columns.

In this experiment, we first execute an empty foreach statement to force Spark to cache the data in the clusters (job 0). Otherwise, the first job would have additional overhead for reading the data.

Then we will run Spark’s histogram operation for each column (jobs 1- 5).

And use our batched aggregator (job 6).

The results:

measurements

We improved from a linear number of histogram calls against column number to a single  batched aggregate call. This method is about 5 times faster for this specific experiment.

Summary

Using only Spark’s built-in histogram method we would be stuck with calling it for each numeric column. Using our approach we can achieve the same results in a single method call which yields a large aggregate performance boost. Our method is roughly N times faster (where N stands for the number of columns).

We are also able to batch other arbitrary aggregators along with it. In our case they would be:

  • Calculating distributions of categorical columns
  • Counting missing values
  • Calculating min, max and mean values for numerical columns

In the future we could add more operations as long as they comply with the aggregator interface.

Here is our aggregator batching code along with the above experiment: GitHub repository

0 replies

Leave a Reply

Want to join the discussion?
Feel free to contribute!

Leave a Reply

Your email address will not be published. Required fields are marked *