deepsense.aideepsense.ai logo
  • Careers
    • Job offers
    • Summer internship
  • Clients’ stories
  • Services
    • AI software
    • Team augmentation
    • AI discovery workshops
    • GPT and other LLMs discovery workshops
    • Generative models
    • Train your team
  • Industries
    • Retail
    • Manufacturing
    • Financial & Insurance
    • IT operations
    • TMT & Other
    • Medical & Beauty
  • Knowledge base
    • deeptalks
    • Blog
    • R&D hub
  • About us
    • Our story
    • Management
    • Advisory board
    • Press center
  • Contact
  • Menu Menu
Optimize Spark with DISTRIBUTE BY & CLUSTER BY

Optimize Spark with DISTRIBUTE BY & CLUSTER BY

May 18, 2016/in Big data & Spark /by Witold Jędrzejewski

Distribute by and cluster by clauses are really cool features in SparkSQL. Unfortunately, this subject remains relatively unknown to most users – this post aims to change that.
In order to gain the most from this post, you should have a basic understanding of how Spark works. In particular, you should know how it divides jobs into stages and tasks, and how it stores data on partitions. If you ’re not familiar with these subject, this article may be a good starting point (besides spark documentation, of course).
Please note that this post was written with Spark 1.6 in mind.

Cluster by/Distribute by/Sort by

Spark lets you write queries in a SQL-like language – HiveQL. HiveQL offers special clauses that let you control the partitioning of data. This article explains how this works in Hive. But what happens if you use them in your SparkSQL queries? How does their behavior map to Spark concepts?

Related:  Region of interest pooling explained

Distribute By

Repartitions a DataFrame by the given expressions. The number of partitions is equal to spark.sql.shuffle.partitions. Note that in Spark, when a DataFrame is partitioned by some expression, all the rows for which this expression is equal are on the same partition (but not necessarily vice-versa)!
This is how it looks in practice. Let’s say we have a DataFrame with two columns: key and value.

SET spark.sql.shuffle.partitions = 2
SELECT * FROM df DISTRIBUTE BY key

Equivalent in DataFrame API:

df.repartition($"key", 2)

Example of how it could work:
spark cluster by distribute by partitions 1

Sort By

Sorts data within partitions by the given expressions. Note that this operation does not cause any shuffle.
In SQL:

SELECT * FROM df SORT BY key

Equivalent in DataFrame API:

df.sortWithinPartitions()

Example of how it could work:
spark cluster by distribute by partitions 2

Cluster By

This is just a shortcut for using distribute by and sort by together on the same set of expressions.
In SQL:

SET spark.sql.shuffle.partitions = 2
SELECT * FROM df CLUSTER BY key

Equivalent in DataFrame API:

df.repartition($"key", 2).sortWithinPartitions()

Example of how it could work:
spark cluster by distribute by partitions 3

When Are They Useful?

Why would you ever want to repartition your DataFrame? Well, there are multiple situations where you really should.

Skewed Data

Your DataFrame is skewed if most of its rows are located on a small number of partitions, while the majority of the partitions remain empty. You really should avoid such a situation. Why? This makes your application virtually not parallel – most of the time you will be waiting for a single task to finish. Even worse, in some cases you can run out of memory on some executors or cause an excessive spill of data to a disk. All of this can happen if your data is not evenly distributed.
To deal with the skew, you can repartition your data using distribute by. For the expression to partition by, choose something that you know will evenly distribute the data. You can even use the primary key of the DataFrame!
For example:

SET spark.sql.shuffle.partitions = 5
SELECT * FROM df DISTRIBUTE BY key, value

could work like this:
spark cluster by distribute by partitions 4
Note that distribute by does not guarantee that data will be distributed evenly between partitions! It all depends on the hash of the expression by which we distribute. In the example above, one can imagine that the hash of (1,b) was equal to the hash of (3,a). And even when hashes for two rows differ, they can still end up on the same partition, when there are fewer partitions than unique hashes! But in most cases, with bigger data samples, this trick can mitigate the skew. Of course, you have to make sure that the partitioning expression is not skewed itself (meaning that expression is equal for most of the rows).

Related:  Playing Atari with deep reinforcement learning - deepsense.ai’s approach

Multiple Joins

When you join two DataFrames, Spark will repartition them both by the join expressions. This means that if you are joining to the same DataFrame many times (by the same expressions each time), Spark will be doing the repartitioning of this DataFrame each time.
Let’s see it in an example.
Let’s open spark-shell and execute the following code.
First, let’s create some DataFrames to play with:

val data = for (key <- 1 to 1000000) yield (key, 1)
sc.parallelize(data).toDF("key", "value").registerTempTable("df")
import scala.util.Random
sc.parallelize(Random.shuffle(data)).toDF("key", "value").registerTempTable("df1")
sc.parallelize(Random.shuffle(data)).toDF("key", "value").registerTempTable("df2")

While performing the join, if one of the DataFrames is small enough, Spark will perform a broadcast join. This is actually a pretty cool feature, but it is a subject for another blog post. Right now, we are interested in Spark’s behavior during a standard join. That’s why – for the sake of the experiment – we’ll turn off the autobroadcasting feature by the following line:

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

Ok, now we are ready to run some joins!

sqlContext.sql("CACHE TABLE df")
sqlContext.sql("SELECT * FROM df JOIN df1 ON df.a = df1.a").show
sqlContext.sql("SELECT * FROM df JOIN df2 ON df.a = df2.a").show

Let’s see how it looks in SparkUI (for spark-shell it usually starts on localhost:4040). Three jobs were executed. Their DAGs look like this:

spark cluster by distribute by DAG for job 1Job 1
spark cluster by distribute by DAG for job 2Job 2
spark cluster by distribute by DAG for job 3Job 3

The first job just creates df and caches it. The second one creates df1 and loads df from the cache (this is indicated by the green dot) and then repartitions both of them by key. The third DAG is really similar to the second one, but uses df2 instead of df1. So it is transparent that we repartitioned df by key two times.
How can this be optimised? The answer is, you can repartition the DataFrame yourself, only once, at the very beginning.

val dfDist = sqlContext.sql("SELECT * FROM df DISTRIBUTE BY a")
dfDist.registerTempTable("df_dist")
sqlContext.sql("CACHE TABLE df_dist")
sqlContext.sql("SELECT * FROM df_dist JOIN df1 ON df_dist.a = df1.a").show
sqlContext.sql("SELECT * FROM df_dist JOIN df2 ON df_dist.a = df2.a").show
spark cluster by distribute by DAG for job 1Job 1
spark cluster by distribute by DAG for job 2Job 2
spark cluster by distribute by DAG for job 3Job 3

This time the first job has an additional stage – we perform repartitioning by key there. But in both of the following jobs, one stage is skipped and the repartitioned DataFrame is taken from the cache – note that green dot is in a different place now.

Related:  Five hottest big data trends 2018 for the techies

Sorting in Join

There is one thing I haven’t yet tell you about yet. Starting from version 1.2, Spark uses sort-based shuffle by default (as opposed to hash-based shuffle). So actually, when you join two DataFrames, Spark will repartition them both by the join expressions and sort them within the partitions! That means the code above can be further optimised by adding sort by to it:

SELECT * FROM df DISTRIBUTE BY a SORT BY a

But as you now know, distribute by + sort by = cluster by, so the query can get even simpler!

SELECT * FROM df CLUSTER BY a

Multiple Join on Already Partitioned DataFrame
Ok, but what if the DataFrame that you will be joining to is already partitioned correctly? For example, if it is a result of grouping by the expressions that will be used in join? Well, in that case you don’t have to repartition it once again – a mere sort by will suffice.

val dfDist = sqlContext.sql("SELECT a, count(*) FROM some_other_df GROUP BY a SORT BY a")
dfDist.registerTempTable("df_dist")
sqlContext.sql("CACHE TABLE df_dist")
sqlContext.sql("SELECT * FROM df_dist JOIN df1 ON df_dist.a = df1.a").show
sqlContext.sql("SELECT * FROM df_dist JOIN df2 ON df_dist.a = df2.a").show

In fact, adding an unnecessary distribute by can actually harm your program! In some cases, Spark won’t be able to see that the data is already partitioned and will repartition it twice. Of course, there is a possibility that this behaviour will change in future releases.

Final Thoughts

Writing Spark applications is easy, but making them optimal can be hard. Sometimes you have to understand what is going on underneath to be able to make your Spark application as fast as it can be. I hope that this post will help you achieve that, at least when it comes to distribute by and cluster by.
Get Seahorse

Share this entry
  • Share on Facebook
  • Share on Twitter
  • Share on WhatsApp
  • Share on LinkedIn
  • Share on Reddit
  • Share by Mail
https://deepsense.ai/wp-content/uploads/2019/02/optimize-spark-with-distribute-by-and-cluster-by.jpg 217 750 Witold Jędrzejewski https://deepsense.ai/wp-content/uploads/2019/04/DS_logo_color.svg Witold Jędrzejewski2016-05-18 12:29:112021-01-05 16:51:14Optimize Spark with DISTRIBUTE BY & CLUSTER BY

Start your search here

Build your AI solution
with us!

Contact us!

NEWSLETTER SUBSCRIPTION

    You can modify your privacy settings and unsubscribe from our lists at any time (see our privacy policy).

    This site is protected by reCAPTCHA and the Google privacy policy and terms of service apply.

    CATEGORIES

    • Generative models
    • Elasticsearch
    • Computer vision
    • Artificial Intelligence
    • AIOps
    • Big data & Spark
    • Data science
    • Deep learning
    • Machine learning
    • Neptune
    • Reinforcement learning
    • Seahorse
    • Job offer
    • Popular posts
    • AI Monthly Digest
    • Press release

    POPULAR POSTS

    • Diffusion models in practice. Part 1 - The tools of the tradeDiffusion models in practice. Part 1: The tools of the tradeMarch 29, 2023
    • Solution guide - The diverse landscape of large language models. From the original Transformer to GPT-4 and beyondGuide: The diverse landscape of large language models. From the original Transformer to GPT-4 and beyondMarch 22, 2023
    • ChatGPT – what is the buzz all about?ChatGPT – what is the buzz all about?March 10, 2023

    Would you like
    to learn more?

    Contact us!
    • deepsense.ai logo white
    • Services
    • AI software
    • Team augmentation
    • AI discovery workshops
    • GPT and other LLMs discovery workshops
    • Generative models
    • Train your team
    • Knowledge base
    • deeptalks
    • Blog
    • R&D hub
    • deepsense.ai
    • Careers
    • Summer internship
    • Our story
    • Management
    • Advisory board
    • Press center
    • Support
    • Terms of service
    • Privacy policy
    • Code of ethics
    • Contact us
    • Join our community
    • facebook logo linkedin logo twitter logo
    • © deepsense.ai 2014-
    Scroll to top

    This site uses cookies. By continuing to browse the site, you are agreeing to our use of cookies.

    OKLearn more

    Cookie and Privacy Settings



    How we use cookies

    We may request cookies to be set on your device. We use cookies to let us know when you visit our websites, how you interact with us, to enrich your user experience, and to customize your relationship with our website.

    Click on the different category headings to find out more. You can also change some of your preferences. Note that blocking some types of cookies may impact your experience on our websites and the services we are able to offer.

    Essential Website Cookies

    These cookies are strictly necessary to provide you with services available through our website and to use some of its features.

    Because these cookies are strictly necessary to deliver the website, refuseing them will have impact how our site functions. You always can block or delete cookies by changing your browser settings and force blocking all cookies on this website. But this will always prompt you to accept/refuse cookies when revisiting our site.

    We fully respect if you want to refuse cookies but to avoid asking you again and again kindly allow us to store a cookie for that. You are free to opt out any time or opt in for other cookies to get a better experience. If you refuse cookies we will remove all set cookies in our domain.

    We provide you with a list of stored cookies on your computer in our domain so you can check what we stored. Due to security reasons we are not able to show or modify cookies from other domains. You can check these in your browser security settings.

    Other external services

    We also use different external services like Google Webfonts, Google Maps, and external Video providers. Since these providers may collect personal data like your IP address we allow you to block them here. Please be aware that this might heavily reduce the functionality and appearance of our site. Changes will take effect once you reload the page.

    Google Webfont Settings:

    Google Map Settings:

    Google reCaptcha Settings:

    Vimeo and Youtube video embeds:

    Privacy Policy

    You can read about our cookies and privacy settings in detail on our Privacy Policy Page.

    Accept settingsHide notification only