deepsense.aideepsense.ai logo
  • Careers
    • Job offers
    • Summer internship
  • Clients’ stories
  • Services
    • AI software
    • Team augmentation
    • AI advisory
    • Train your team
    • Generative models
  • Industries
    • Retail
    • Manufacturing
    • Financial & Insurance
    • IT operations
    • TMT & Other
    • Medical & Beauty
  • Knowledge base
    • deeptalks
    • Blog
    • R&D hub
  • About us
    • Our story
    • Management
    • Advisory board
    • Press center
  • Contact
  • Menu Menu
Improve Apache Spark aggregate performance with batching

Improve Apache Spark aggregate performance with batching

March 21, 2016/in Big data & Spark, Seahorse /by Adam Jakubowski

Seahorse provides users with reports on their data at every step in the workflow. A user can view reports after each operation to review the intermediate results. In our reports we provide users with distributions for columns in the form of a histogram for continuous data, and a pie chart for categorical data. In order to calculate data reports Seahorse needs to execute many independent aggregate operations for each data column. We managed to significantly improve aggregate performance by batching those operations together. That way those aggregations can be executed in just one data-pass.
categorical

Categorical distribution

 

transform_report-1

DataFrame report. Users are able to view distributions of their data after clicking on the chart icon.

 

Data Reports in Seahorse

Reports are generated for each DataFrame after every Operation. This means that performance is crucial here.
In our first approach, we used Apache Spark’s built-in `histogram` on RDD[Double] feature. We would initially map our RDD[Row] to RDD[Double] for each Double column and call `histogram` for each one.
Unfortunately this approach is very time-intensive. Calculating histograms for each column independently meant that we were performing expensive, full-data passes per each column.
Another alternative would be to introduce a form of special multi-histogram operation which would operate on multiple columns. Apache Spark already does that for column statistics – there is a Multicolumn Statistics method that calculates column statistics for each column in only one data pass (MultivariateStatisticalSummary).
This approach was not good enough for us. We would like to have a generic solution to combine arbitrary operations together instead of writing custom multi-operations.

Abstracting over aggregator

An operation producing one value out of a whole dataset is called aggregation. In order to perform an aggregation, the user must provide three components:

  • `mergeValue` function – aggregates results from a single partition
  • `mergeCombiners` function – merges aggregated results from the partitions
  • `initialElement` – the initial aggregator value.

With these three pieces of data, Apache Spark is able to aggregate data across each partition and then combine these partial results together to produce final value.
We introduced an abstract `Aggregator` encapsulating the `mergeValue`, `mergeCombiners` and, `initialElement` properties. Thanks to this abstraction we can create `Wrappers` for our aggregators with additional behaviors.
MultiAggregator is one such wrapper. It wraps a sequence of aggregators and executes them in a batch – in a single data-pass. Its implementation involves:

  • `mergeValue` – aggregates results from a single partition for each aggregator
  • `mergeCombiners` – merges aggregated results from the partitions for each aggregator
  • `initialElement` – initial aggregator values for each aggregator
case class MultiAggregator[U, T](aggregators: Seq[Aggregator[U, T]])
 extends Aggregator[Seq[U], T] {
 override def initialElement: Seq[U] = aggregators.map(_.initialElement)
 override def mergeValue(accSeq: Seq[U], elem: T): Seq[U] = {
   (accSeq, aggregators).zipped.map { (acc, aggregator) =>
     aggregator.mergeValue(acc, elem)
   }
 }
 override def mergeCombiners(leftSeq: Seq[U], rightSeq: Seq[U]): Seq[U] = {
   (leftSeq, rightSeq, aggregators).zipped.map { (left, right, aggregator) =>
     aggregator.mergeCombiners(left, right)
   }
 }
}

This special aggregator wraps a sequence of aggregators and then uses their `mergeValue`, `mergeCombiners` and `initialElement` functions to perform aggregation in one batch.

Reading Results in a Type-safe Manner

Each aggregator can return an arbitrary value type. In order to maintain type safety and genericity we added a special class encapsulating results for all input aggregators. In order to get specific aggregator result, we pass initial aggregator object to batched result. That way result type is correctly inferred from the aggregator’s result type.

// Usage
// Aggregators to execute in a batch
val aggregators: Seq[Aggregator[_,_]] = ???
// Batched result executed in one spark call
val batchedResult: BatchedResult = AggregatorBatch.executeInBatch(rows, aggregators)
// Accessing result is done by passing origin aggregator to the result object
val firstAggregator = aggregators(0)
// Type of `firstResult` is derived from output type of `firstAggregator`
val firstResult = batchedResult.forAggregator(firstAggregator)

As we can see, each value type is derived from the aggregator’s return type.

Measurements and comparing aggregate performace

Let’s measure time for calculating histograms for data rows made up of 5 numeric columns.
In this experiment, we first execute an empty foreach statement to force Spark to cache the data in the clusters (job 0). Otherwise, the first job would have additional overhead for reading the data.

// job 0 - forces Spark to cache the data in cluster. It will take away data-loading overhead
// from future jobs and make future measurements reliable.
rows.foreach((_) => ())

Then we will run Spark’s `histogram` operation for each column (jobs 1- 5).

private def sequentialProcessing(rows: RDD[Row], buckets: Array[Double]): Seq[Array[Long]] =
 for (i <- 0 until 5) yield {
   val column = rows.map(extractDoubleColumn(i))
   column.histogram(buckets, true)
 }

And use our batched aggregator (job 6).

private def batchedProcessing(rows: RDD[Row], buckets: Array[Double]): Seq[Array[Long]] = {
 val aggregators = for (i <- 0 until 5) yield
   HistogramAggregator(buckets, true).mapInput(extractDoubleColumn(i))
 val batchedResult = AggregatorBatch.executeInBatch(rows, aggregators)
 for (aggregator <- aggregators) yield batchedResult.forAggregator(aggregator)
}

The results:
measurements
We improved from a linear number of `histogram` calls against column number to a single  batched aggregate call. This method is about 5 times faster for this specific experiment.

Summary

Using only Spark’s built-in `histogram` method we would be stuck with calling it for each numeric column. Using our approach we can achieve the same results in a single method call which yields a large aggregate performance boost. Our method is roughly N times faster (where N stands for the number of columns).
We are also able to batch other arbitrary aggregators along with it. In our case they would be:

  • Calculating distributions of categorical columns
  • Counting missing values
  • Calculating min, max and mean values for numerical columns

In the future we could add more operations as long as they comply with the aggregator interface.
Here is our aggregator batching code along with the above experiment: GitHub repository

Share this entry
  • Share on Facebook
  • Share on Twitter
  • Share on WhatsApp
  • Share on LinkedIn
  • Share on Reddit
  • Share by Mail
https://deepsense.ai/wp-content/uploads/2019/02/improve-aggregate-performance-with-batching.jpg 217 750 Adam Jakubowski https://deepsense.ai/wp-content/uploads/2019/04/DS_logo_color.svg Adam Jakubowski2016-03-21 18:33:502021-01-05 16:51:27Improve Apache Spark aggregate performance with batching

Start your search here

Build your AI solution
with us!

Contact us!

NEWSLETTER SUBSCRIPTION

    You can modify your privacy settings and unsubscribe from our lists at any time (see our privacy policy).

    This site is protected by reCAPTCHA and the Google privacy policy and terms of service apply.

    CATEGORIES

    • Generative models
    • Elasticsearch
    • Computer vision
    • Artificial Intelligence
    • AIOps
    • Big data & Spark
    • Data science
    • Deep learning
    • Machine learning
    • Neptune
    • Reinforcement learning
    • Seahorse
    • Job offer
    • Popular posts
    • AI Monthly Digest
    • Press release

    POPULAR POSTS

    • ChatGPT – what is the buzz all about?ChatGPT – what is the buzz all about?March 10, 2023
    • How to leverage ChatGPT to boost marketing strategyHow to leverage ChatGPT to boost marketing strategy?February 26, 2023
    • How can we improve language models using reinforcement learning? ChatGPT case studyHow can we improve language models using reinforcement learning? ChatGPT case studyFebruary 20, 2023

    Would you like
    to learn more?

    Contact us!
    • deepsense.ai logo white
    • Services
    • Customized AI software
    • Team augmentation
    • AI advisory
    • Generative models
    • Knowledge base
    • deeptalks
    • Blog
    • R&D hub
    • deepsense.ai
    • Careers
    • Summer internship
    • Our story
    • Management
    • Advisory board
    • Press center
    • Support
    • Terms of service
    • Privacy policy
    • Code of ethics
    • Contact us
    • Join our community
    • facebook logo linkedin logo twitter logo
    • © deepsense.ai 2014-
    Scroll to top

    This site uses cookies. By continuing to browse the site, you are agreeing to our use of cookies.

    OKLearn more

    Cookie and Privacy Settings



    How we use cookies

    We may request cookies to be set on your device. We use cookies to let us know when you visit our websites, how you interact with us, to enrich your user experience, and to customize your relationship with our website.

    Click on the different category headings to find out more. You can also change some of your preferences. Note that blocking some types of cookies may impact your experience on our websites and the services we are able to offer.

    Essential Website Cookies

    These cookies are strictly necessary to provide you with services available through our website and to use some of its features.

    Because these cookies are strictly necessary to deliver the website, refuseing them will have impact how our site functions. You always can block or delete cookies by changing your browser settings and force blocking all cookies on this website. But this will always prompt you to accept/refuse cookies when revisiting our site.

    We fully respect if you want to refuse cookies but to avoid asking you again and again kindly allow us to store a cookie for that. You are free to opt out any time or opt in for other cookies to get a better experience. If you refuse cookies we will remove all set cookies in our domain.

    We provide you with a list of stored cookies on your computer in our domain so you can check what we stored. Due to security reasons we are not able to show or modify cookies from other domains. You can check these in your browser security settings.

    Other external services

    We also use different external services like Google Webfonts, Google Maps, and external Video providers. Since these providers may collect personal data like your IP address we allow you to block them here. Please be aware that this might heavily reduce the functionality and appearance of our site. Changes will take effect once you reload the page.

    Google Webfont Settings:

    Google Map Settings:

    Google reCaptcha Settings:

    Vimeo and Youtube video embeds:

    Privacy Policy

    You can read about our cookies and privacy settings in detail on our Privacy Policy Page.

    Accept settingsHide notification only