Fast and accurate categorical distribution without reshuffling in Apache Spark

In Seahorse we want to provide our users with accurate distributions for their categorical data. Categorical data can be thought of as possible results of an observation that can take one of K possible outcomes. Some examples: Nationality, Marital Status, Gender, Type of Education.

categorical

 

In the described scenario we don’t have prior knowledge whether a given feature is categorical or not. We would like to treat features as categorical as long as their value set is small enough. Moreover, if there are too many distinct values we are no longer interested in its discrete distribution.

Seahorse is build on top of Apache Spark. Naive and easy approach to this problem would be to simply use Spark’s reduceByKey or groupByKey methods. The ReduceByKey method operates on key-value pairs and accepts a function reducing two values into one. If there are many values for one key, those will get reduced to one value with the provided function.

Unfortunately using these methods means poor performance due to Spark’s partition reshuffling.

Counting occurrences of specific values of a feature can be easily done without reshuffling. We might think of calculating a category distribution as counting occurrences of each category. This can be done using RDD’s function aggregate.

The aggregate function reduces a dataset to one value using a seq and comb functions and an initial aggregator value. The seq function adds one value to an aggregator. It is used to combine all values from a partition into one aggregated value. The comb function adds two aggregators together – it is used to combine results from different partitions.

In our case aggregator value will be of type Option(Map[T, Long]) representing occurrences count for value of type T or None if there are too many distinct categorical values.

The comb function (mergeCombiners in code) adds two aggregator maps together merging their results. If the result map has more distinct categorical values than the specified limit – no distribution is returned (a None).

The seq function (mergeValue in code) increments occurrences count of a value in the aggregator map.

Let the code speak for itself:

The code above is clean, functional and takes advantage of Scalaz. Unfortunately, it allocates a new object per each record in the dataset. With Apache Spark, sometimes it’s better not to use immutable objects in favor of mutable ones for performance’s sake. According to the documentation, both seq and comb functions are allowed to modify and return their first argument instead of creating a new object to avoid memory allocation. In the next approach we take advantage of that. The code below is not as pretty, but its performance is much better.

 

Measurements

We put a hypothesis that the last approach should be at least two time faster. The group by operation works in two stages with a partition reshuffling between them. Our solution runs in one stage.

A simple experiment with Apache Spark seems to confirm that. For the experiment we used:

  • a local Apache Spark cluster,
  • a Data Set containing  10,000,000 rows,
  • with 33 labels in the category column  (uniformly distributed).

In the experiment we execute an empty foreach statement first to force Spark to cache the data in the clusters (job 0). Otherwise the first job would have additional overhead (for reading the data).

The jobs 1 and 2 are parts of the first, naive solution. The job 1 measures categories count. The job 2 calculates the distributions.

The job 2 calculates distributions.

The job 3 and 4 represent the two  optimised solutions. The job 3 is the functional one. The job 4 is the one that takes the advantage of mutable objects.

 

Spark jobs

The job 1 (distinct count) and the job 2 (reduceByKey) have 400 tasks and use memory to perform shuffling.

The job 3 and  job 4 – have only 200 tasks, run in one stage and don’t use additional memory to perform shuffling. They also check for category counts dynamically.

An interesting observation is that the functional job 3 is slower than the reduceByKey job even though it has only 200 tasks. A memory allocation overhead is that severe!

Summary

Categorical features are very common in Data Science. With the proposed approach we are able to calculate accurate categorical distributions fast and without a prior knowledge if a feature is categorical or not. It is guaranteed to execute in only one Apache Spark job and without any partition reshuffling.

Related Posts

1 reply
  1. Twinkal Khanna
    Twinkal Khanna says:

    Very Impressive Spark tutorial. The content seems to be pretty exhaustive and excellent and will definitely help in learning Spark Project. I’m also a learner taken up Spark Tutorial and I think your content has cleared some concepts of mine. While browsing for Spark Training on YouTube i found this fantastic video on Spark Tutorial. Do check it out if you are interested to know more.:-https://www.youtube.com/watch?v=dMDQz82FCqE

    Reply

Leave a Reply

Want to join the discussion?
Feel free to contribute!

Leave a Reply

Your email address will not be published. Required fields are marked *