deepsense.aideepsense.ai logo
  • Careers
    • Job offers
    • Summer internship
  • Clients’ stories
  • Services
    • AI software
    • Team augmentation
    • AI advisory
    • Train your team
    • Generative models
  • Industries
    • Retail
    • Manufacturing
    • Financial & Insurance
    • IT operations
    • TMT & Other
    • Medical & Beauty
  • Knowledge base
    • deeptalks
    • Blog
    • R&D hub
  • About us
    • Our story
    • Management
    • Advisory board
    • Press center
  • Contact
  • Menu Menu
Fast and accurate categorical distribution without reshuffling in Apache Spark

Fast and accurate categorical distribution without reshuffling in Apache Spark

February 10, 2016/in Big data & Spark, Seahorse /by Adam Jakubowski

In Seahorse we want to provide our users with accurate distributions for their categorical data. Categorical data can be thought of as possible results of an observation that can take one of K possible outcomes. Some examples: Nationality, Marital Status, Gender, Type of Education.
categorical

In the described scenario we don’t have prior knowledge whether a given feature is categorical or not. We would like to treat features as categorical as long as their value set is small enough. Moreover, if there are too many distinct values we are no longer interested in its discrete distribution.
Seahorse is build on top of Apache Spark. Naive and easy approach to this problem would be to simply use Spark’s reduceByKey or groupByKey methods. The ReduceByKey method operates on key-value pairs and accepts a function reducing two values into one. If there are many values for one key, those will get reduced to one value with the provided function.

// limit - max. number of distinct categories to treat a feature as categorical
if(rdd.distinct().count() < limit) {
  // causes partition reshuffling
  val discreteDistribution = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
  Some(discreteDistribution)
} else {
  None
}

Unfortunately using these methods means poor performance due to Spark’s partition reshuffling.
Counting occurrences of specific values of a feature can be easily done without reshuffling. We might think of calculating a category distribution as counting occurrences of each category. This can be done using RDD’s function aggregate.
The aggregate function reduces a dataset to one value using a seq and comb functions and an initial aggregator value. The seq function adds one value to an aggregator. It is used to combine all values from a partition into one aggregated value. The comb function adds two aggregators together – it is used to combine results from different partitions.
In our case aggregator value will be of type Option(Map[T, Long]) representing occurrences count for value of type T or None if there are too many distinct categorical values.
The comb function (mergeCombiners in code) adds two aggregator maps together merging their results. If the result map has more distinct categorical values than the specified limit – no distribution is returned (a None).
The seq function (mergeValue in code) increments occurrences count of a value in the aggregator map.
Let the code speak for itself:

import scalaz._
import scalaz.Scalaz._
(...) // other imports
object FunctionalDemo {
  // limit - represents the maximum number of categories.
  // if there are more distinct values than `limit`, no categorical distribution will be returned
  case class CategoryDistributionCalculator[T](limit: Int) {
    def calculate(rdd: RDD[T]): Option[Map[T, Long]] =
        rdd.aggregate(zero)(mergeValue _, mergeCombiners _)
    // initial aggregator value
    private val zero = Option(Map.empty[T, Long])
   def mergeCombiners[T](
      leftOpt: Option[Map[T, Long]],
      rightOpt: Option[Map[T, Long]]): Option[Map[T, Long]] = {
      for (left <- leftOpt; right <- rightOpt) yield {
        // Adds maps in such a way that numbers from same keys are summed.
        val sumMap = left |+| right
        // yields Some when boolean condition is met. None otherwise
        (sumMap.keySet.size <= limit).option(sumMap)
      }
    }.flatten
    def mergeValue[T](acc: Option[Map[T, Long]], next: T): Option[Map[T, Long]] =
      mergeCombiners(acc, Some(Map(next -> 1)))
  }
  def main(args: Array[String]): Unit = {
    (...)
    val valueSlow = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
    val valueFast = CategoryDistributionCalculator(30).calculate(rdd).get
    (...)
  }
}

The code above is clean, functional and takes advantage of Scalaz. Unfortunately, it allocates a new object per each record in the dataset. With Apache Spark, sometimes it’s better not to use immutable objects in favor of mutable ones for performance’s sake. According to the documentation, both seq and comb functions are allowed to modify and return their first argument instead of creating a new object to avoid memory allocation. In the next approach we take advantage of that. The code below is not as pretty, but its performance is much better.

(...) // imports
object MutableDemo {
  // limit - represents maximum size of categorical type
  // if there are more distinct values than `limit`, no categorical distribution will be returned
  case class CategoryDistributionCalculator[T](limit: Int) {
    // None is returned if there are too many distinct values (>limit) to calculate categorical distribution
    def calculate(rdd: RDD[T]): Option[Map[T, Long]] = {
      val mutableMapOpt = rdd.aggregate(zero)(mergeValue _, mergeCombiners _)
      val immutableMapOpt = mutableMapOpt.map(_.toMap)
      immutableMapOpt
    }
    private val zero = Option(mutable.Map.empty[T, Long])
    private def mergeValue(accOpt: Option[mutable.Map[T, Long]],
      next: T): Option[mutable.Map[T, Long]] = {
      accOpt.foreach { acc =>
        addOccurrencesToMap(acc, next, 1)
      }
      replacedWithNoneIfLimitExceeded(accOpt)
    }
    private def mergeCombiners(leftOpt: Option[mutable.Map[T, Long]],
      rightOpt: Option[mutable.Map[T, Long]]): Option[mutable.Map[T, Long]] = {
      for (left <- leftOpt; rightMap <- rightOpt) {
        rightMap.foreach { case (element, count) =>
          addOccurrencesToMap(left, element, count)
        }
      }
      replacedWithNoneIfLimitExceeded(leftOpt)
    }
    private def addOccurrencesToMap(
      occurrences: mutable.Map[T, Long],
      element: T,
      count: Long): Unit = {
      occurrences(element) = occurrences.getOrElse(element, 0L) + count
    }
    private def replacedWithNoneIfLimitExceeded(
      mapOpt: Option[mutable.Map[T, Long]]): Option[mutable.Map[T, Long]] = {
      mapOpt.flatMap { map =>
        if (map.size <= limit) mapOpt else None
      }
    }
  }
  def main(args: Array[String]): Unit = {
    (...)
    val valueSlow = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
    val valueFast = CategoryDistributionCalculator(30).calculate(rdd).get
    (...)
  }
}

 

Measurements

We put a hypothesis that the last approach should be at least two time faster. The `group by` operation works in two stages with a partition reshuffling between them. Our solution runs in one stage.
A simple experiment with Apache Spark seems to confirm that. For the experiment we used:

  • a local Apache Spark cluster,
  • a Data Set containing  10,000,000 rows,
  • with 33 labels in the category column  (uniformly distributed).

In the experiment we execute an empty foreach statement first to force Spark to cache the data in the clusters (job 0). Otherwise the first job would have additional overhead (for reading the data).
The jobs 1 and 2 are parts of the first, naive solution. The job 1 measures categories count. The job 2 calculates the distributions.
The job 2 calculates distributions.
The job 3 and 4 represent the two  optimised solutions. The job 3 is the functional one. The job 4 is the one that takes the advantage of mutable objects.

// job 0 - forces Spark to cache data in cluster. It will take away data-loading overhead
// from future jobs and make future measurements reliable.
rdd.foreach((_) => ())
// job 1 - checks if there are too many distinct values to form category
if(rdd.distinct().count() < limit) {
 // job 2 - calculates categorical distribution
 val discreteDistribution = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
 Some(discreteDistribution)
} else {
 None
}
// job 3 - optimised mutable approach
FunctionalDemo.CategoryDistributionCalculator(limit).execute(rdd)
// job 4 - functional approach
MutableDemo.CategoryDistributionCalculator(limit).execute(rdd)

Spark jobs
The job 1 (distinct count) and the job 2 (reduceByKey) have 400 tasks and use memory to perform shuffling.
The job 3 and  job 4 – have only 200 tasks, run in one stage and don’t use additional memory to perform shuffling. They also check for category counts dynamically.
An interesting observation is that the functional job 3 is slower than the reduceByKey job even though it has only 200 tasks. A memory allocation overhead is that severe!

Summary

Categorical features are very common in Data Science. With the proposed approach we are able to calculate accurate categorical distributions fast and without a prior knowledge if a feature is categorical or not. It is guaranteed to execute in only one Apache Spark job and without any partition reshuffling.

Share this entry
  • Share on Facebook
  • Share on Twitter
  • Share on WhatsApp
  • Share on LinkedIn
  • Share on Reddit
  • Share by Mail
https://deepsense.ai/wp-content/uploads/2019/02/fast-and-accurate-categorical-distribution-without-reshuffling-in-apache-spark.jpg 217 750 Adam Jakubowski https://deepsense.ai/wp-content/uploads/2019/04/DS_logo_color.svg Adam Jakubowski2016-02-10 16:43:002021-01-05 16:51:36Fast and accurate categorical distribution without reshuffling in Apache Spark

Start your search here

Build your AI solution
with us!

Contact us!

NEWSLETTER SUBSCRIPTION

    You can modify your privacy settings and unsubscribe from our lists at any time (see our privacy policy).

    This site is protected by reCAPTCHA and the Google privacy policy and terms of service apply.

    CATEGORIES

    • Generative models
    • Elasticsearch
    • Computer vision
    • Artificial Intelligence
    • AIOps
    • Big data & Spark
    • Data science
    • Deep learning
    • Machine learning
    • Neptune
    • Reinforcement learning
    • Seahorse
    • Job offer
    • Popular posts
    • AI Monthly Digest
    • Press release

    POPULAR POSTS

    • ChatGPT – what is the buzz all about?ChatGPT – what is the buzz all about?March 10, 2023
    • How to leverage ChatGPT to boost marketing strategyHow to leverage ChatGPT to boost marketing strategy?February 26, 2023
    • How can we improve language models using reinforcement learning? ChatGPT case studyHow can we improve language models using reinforcement learning? ChatGPT case studyFebruary 20, 2023

    Would you like
    to learn more?

    Contact us!
    • deepsense.ai logo white
    • Services
    • Customized AI software
    • Team augmentation
    • AI advisory
    • Generative models
    • Knowledge base
    • deeptalks
    • Blog
    • R&D hub
    • deepsense.ai
    • Careers
    • Summer internship
    • Our story
    • Management
    • Advisory board
    • Press center
    • Support
    • Terms of service
    • Privacy policy
    • Code of ethics
    • Contact us
    • Join our community
    • facebook logo linkedin logo twitter logo
    • © deepsense.ai 2014-
    Scroll to top

    This site uses cookies. By continuing to browse the site, you are agreeing to our use of cookies.

    OKLearn more

    Cookie and Privacy Settings



    How we use cookies

    We may request cookies to be set on your device. We use cookies to let us know when you visit our websites, how you interact with us, to enrich your user experience, and to customize your relationship with our website.

    Click on the different category headings to find out more. You can also change some of your preferences. Note that blocking some types of cookies may impact your experience on our websites and the services we are able to offer.

    Essential Website Cookies

    These cookies are strictly necessary to provide you with services available through our website and to use some of its features.

    Because these cookies are strictly necessary to deliver the website, refuseing them will have impact how our site functions. You always can block or delete cookies by changing your browser settings and force blocking all cookies on this website. But this will always prompt you to accept/refuse cookies when revisiting our site.

    We fully respect if you want to refuse cookies but to avoid asking you again and again kindly allow us to store a cookie for that. You are free to opt out any time or opt in for other cookies to get a better experience. If you refuse cookies we will remove all set cookies in our domain.

    We provide you with a list of stored cookies on your computer in our domain so you can check what we stored. Due to security reasons we are not able to show or modify cookies from other domains. You can check these in your browser security settings.

    Other external services

    We also use different external services like Google Webfonts, Google Maps, and external Video providers. Since these providers may collect personal data like your IP address we allow you to block them here. Please be aware that this might heavily reduce the functionality and appearance of our site. Changes will take effect once you reload the page.

    Google Webfont Settings:

    Google Map Settings:

    Google reCaptcha Settings:

    Vimeo and Youtube video embeds:

    Privacy Policy

    You can read about our cookies and privacy settings in detail on our Privacy Policy Page.

    Accept settingsHide notification only