Optimize Spark with DISTRIBUTE BY & CLUSTER BY

Optimize Spark with DISTRIBUTE BY & CLUSTER BY

Distribute by and cluster by clauses are really cool features in SparkSQL. Unfortunately, this subject remains relatively unknown to most users – this post aims to change that.

In order to gain the most from this post, you should have a basic understanding of how Spark works. In particular, you should know how it divides jobs into stages and tasks, and how it stores data on partitions. If you ’re not familiar with these subject, this article may be a good starting point (besides spark documentation, of course).

Please note that this post was written with Spark 1.6 in mind.

Cluster by/Distribute by/Sort by

Spark lets you write queries in a SQL-like language – HiveQL. HiveQL offers special clauses that let you control the partitioning of data. This article explains how this works in Hive. But what happens if you use them in your SparkSQL queries? How does their behavior map to Spark concepts?

Related:  Region of interest pooling explained

Distribute By

Repartitions a DataFrame by the given expressions. The number of partitions is equal to spark.sql.shuffle.partitions. Note that in Spark, when a DataFrame is partitioned by some expression, all the rows for which this expression is equal are on the same partition (but not necessarily vice-versa)!

This is how it looks in practice. Let’s say we have a DataFrame with two columns: key and value.

Equivalent in DataFrame API:

Example of how it could work:

spark cluster by distribute by partitions 1

Sort By

Sorts data within partitions by the given expressions. Note that this operation does not cause any shuffle.

In SQL:

Equivalent in DataFrame API:

Example of how it could work:

spark cluster by distribute by partitions 2

Cluster By

This is just a shortcut for using distribute by and sort by together on the same set of expressions.

In SQL:

Equivalent in DataFrame API:

Example of how it could work:

spark cluster by distribute by partitions 3

When Are They Useful?

Why would you ever want to repartition your DataFrame? Well, there are multiple situations where you really should.

Skewed Data

Your DataFrame is skewed if most of its rows are located on a small number of partitions, while the majority of the partitions remain empty. You really should avoid such a situation. Why? This makes your application virtually not parallel – most of the time you will be waiting for a single task to finish. Even worse, in some cases you can run out of memory on some executors or cause an excessive spill of data to a disk. All of this can happen if your data is not evenly distributed.

To deal with the skew, you can repartition your data using distribute by. For the expression to partition by, choose something that you know will evenly distribute the data. You can even use the primary key of the DataFrame!

For example:

could work like this:

spark cluster by distribute by partitions 4

Note that distribute by does not guarantee that data will be distributed evenly between partitions! It all depends on the hash of the expression by which we distribute. In the example above, one can imagine that the hash of (1,b) was equal to the hash of (3,a). And even when hashes for two rows differ, they can still end up on the same partition, when there are fewer partitions than unique hashes! But in most cases, with bigger data samples, this trick can mitigate the skew. Of course, you have to make sure that the partitioning expression is not skewed itself (meaning that expression is equal for most of the rows).

Related:  Playing Atari with deep reinforcement learning - deepsense.ai’s approach

Multiple Joins

When you join two DataFrames, Spark will repartition them both by the join expressions. This means that if you are joining to the same DataFrame many times (by the same expressions each time), Spark will be doing the repartitioning of this DataFrame each time.

Let’s see it in an example.

Let’s open spark-shell and execute the following code.

First, let’s create some DataFrames to play with:

While performing the join, if one of the DataFrames is small enough, Spark will perform a broadcast join. This is actually a pretty cool feature, but it is a subject for another blog post. Right now, we are interested in Spark’s behavior during a standard join. That’s why – for the sake of the experiment – we’ll turn off the autobroadcasting feature by the following line:

Ok, now we are ready to run some joins!

Let’s see how it looks in SparkUI (for spark-shell it usually starts on localhost:4040). Three jobs were executed. Their DAGs look like this:

The first job just creates df and caches it. The second one creates df1 and loads df from the cache (this is indicated by the green dot) and then repartitions both of them by key. The third DAG is really similar to the second one, but uses df2 instead of df1. So it is transparent that we repartitioned df by key two times.

How can this be optimised? The answer is, you can repartition the DataFrame yourself, only once, at the very beginning.

This time the first job has an additional stage – we perform repartitioning by key there. But in both of the following jobs, one stage is skipped and the repartitioned DataFrame is taken from the cache – note that green dot is in a different place now.

Related:  Five hottest big data trends 2018 for the techies

Sorting in Join

There is one thing I haven’t yet tell you about yet. Starting from version 1.2, Spark uses sort-based shuffle by default (as opposed to hash-based shuffle). So actually, when you join two DataFrames, Spark will repartition them both by the join expressions and sort them within the partitions! That means the code above can be further optimised by adding sort by to it:

But as you now know, distribute by + sort by = cluster by, so the query can get even simpler!

Multiple Join on Already Partitioned DataFrame

Ok, but what if the DataFrame that you will be joining to is already partitioned correctly? For example, if it is a result of grouping by the expressions that will be used in join? Well, in that case you don’t have to repartition it once again – a mere sort by will suffice.

In fact, adding an unnecessary distribute by can actually harm your program! In some cases, Spark won’t be able to see that the data is already partitioned and will repartition it twice. Of course, there is a possibility that this behaviour will change in future releases.

Final Thoughts

Writing Spark applications is easy, but making them optimal can be hard. Sometimes you have to understand what is going on underneath to be able to make your Spark application as fast as it can be. I hope that this post will help you achieve that, at least when it comes to distribute by and cluster by.

Get Seahorse

Related Posts

26 replies
  1. pete
    pete says:

    Every now and then you stumble upon a blog, which justifies it’s existence and provides information in a clear and lucid format and this is one of them. Bravo!!!.

    Reply
  2. Ven Govindarajan
    Ven Govindarajan says:

    Thank you for the great article! What strategy do you recommend for a sql statement involving multiple joins?
    For example:
    SELECT *
    FROM
    df_dist
    JOIN df1 ON df_dist.a = df1.a
    JOIN df2 ON df_dist.a = df2.a

    How does caching df_dist affect this join? Is caching even necessary since for the second join, the input would be the result of the first join?

    Reply
    • Witold Jedrzejewski
      Witold Jedrzejewski says:

      Hi,
      that’s a great question!
      In this case, if you are interested only in the final result of two joins, I would recommend not to cache anything.
      In general, there is no need for caching if you run only one Spark action.

      Reply
  3. Ven Govindarajan
    Ven Govindarajan says:

    Thank you! On further analysis with Spark 2.0.1, using “cluster by” has no effect. I am joining two large tables in sparksql. When loading into a dataframe, I used

    val rawDf1 = spark.read.parquet(“file in hdfs”)
    rawDf1 .createOrReplaceTempView(“df1”)

    val rawDf2 = spark.read.parquet(“file in hdfs”)
    rawDf2 .createOrReplaceTempView(“df2”)

    val resultDf = spark.sql(“select * from df1 a inner join df2 b on a.key = b.key”)

    Whether I use cluster by key or not, I still see the same query plan being generated by Spark.

    Any thoughts?

    Reply
  4. Ven Govindarajan
    Ven Govindarajan says:

    Correction to the above post:
    Thank you! On further analysis with Spark 2.0.1, using “cluster by” has no effect. I am joining two large tables in sparksql. When loading into a dataframe, I used

    val rawDf1 = spark.read.parquet(“file in hdfs”)
    rawDf1 .createOrReplaceTempView(“rawdf1”)

    val rawDf2 = spark.read.parquet(“file in hdfs”)
    rawDf2 .createOrReplaceTempView(“rawdf2”)

    val df1 = spark.sql(“select * from rawdf1 cluster by key)
    df1 .createOrReplaceTempView(“df1”)

    val df2 = spark.sql(“select * from rawdf2 cluster by key)
    df2 .createOrReplaceTempView(“df2”)

    val resultDf = spark.sql(“select * from df1 a inner join df2 b on a.key = b.key”)

    Whether I use cluster by key or not, I still see the same query plan being generated by Spark.
    Any thoughts?

    Reply
    • Witold Jedrzejewski
      Witold Jedrzejewski says:

      Hmm, interesting. This post was written with Spark 1.6 in mind. Perhaps something has change with regard to “cluster by” in Spark 2.0.
      Another possibility is that your parquet files are already partitioned by “key”…

      Reply
  5. Philip
    Philip says:

    Thanks for this article!
    It helped me to understand, why a .repartition(key1, key2) that I used did not work as I expected, as you write:
    “…all the rows for which this expression is equal are on the same partition (but not necessarily vice-versa)!”
    Is there a way to force Spark to only put data with the same key-combination in one partition, thus creating a one-to-one relationship between keys-combinations and partitions?

    Reply
    • Witold Jedrzejewski
      Witold Jedrzejewski says:

      As far as I know, there is no easy way of achieving that in Spark.
      Perhaps you could share the broader context of your use case?
      It sounds like your approach might not be correct…

      Reply
      • Philip
        Philip says:

        Thanks for your answer!
        We have an SQLDataFrame df containing rows belonging to about 150 different key-combinations. As our code is developed mostly in Pandas and a complete rewriting of it did not seem as an option, our first approach was to do something like

        df.repartition(key1, key2).rdd.mapPartitions(funcPartitions).moreOperations…

        where funcPartitionsis a function like

        def funcPartitions(iter):
        dfPart = pd.DataFrame(list(iter))
        # do some computation/manipulation of dfPart here!
        return [dfPart] if len(dfPart) > 0 else []

        However, as there are multiple key-combinations in some partitions, it does not work as expected.

        Reply
        • Witold Jedrzejewski
          Witold Jedrzejewski says:

          In this case, the easiest solution would be to do everything with groupBy.
          It would look like this:

          df.rdd.map(lambda r: ((r(key1), r(key2)), r)).groupByKey().mapValues(funcIter).moreOperations…

          funcIter would do roughly what funcPartitions do. In this case, everything passed to funcIter will have the same key(s).

          Reply
        • ashok
          ashok says:

          hey how have you done that, i have an scenario where i have to save the data frame after partitioning by two columns and these two columns are unique in the complete set. if i do df.write.partitionBy it is creating millions of small files which i want to avoid, but at the same time it has create and preserve partition while writing so that downstream consuming apps don’t have to shuffle it again., many down stream apps will be using the same keys for joining and they don’t want to go through shuffle operation again.

          Reply
  6. Varma
    Varma says:

    Superb explanation Witold Jedrzejewski. I tried applying the same to the below example and found this is not applied. Can you please elaborate more on this, w.r.t the below example.

    df1 -> Clustered by type, id,rand_val(Random value generated to evenly distribute data with in the Spark partition)
    df2 -> Clustered by proid, designation, conid
    df3 -> Clustered by type, id, conid

    Query Used for execution:
    select * from df1
    join df2 on concat_ws(‘|’, df1.id, ‘engineer’) = concat_ws(‘|’, df2.proid, df2.designation)
    left join df3 on (df3.type = df1.type and df3.id = df2.proid and df3.conid = df2.conid)

    df1 — Sample Data. Joining on “id” column which is skewed with the value “a”.
    Type Id Rand_val
    t a 1
    t a 1
    t a 2
    t a 2
    t b 1

    df2 — Sample Data
    proid designation conid
    a engineer 1
    a engineer 2
    a engineer 1
    a engineer 2
    c engineer 1

    df3 — Sample Data.
    type id conid
    t a 1
    t a 1
    t b 1

    I have checked the count of data in each partition of the dataframes(df1, df2, df3) and found these are evenly distributed. But when iam executing the above query, the data is again skewing up in one of the stages of the Join. Can you please add some points on this.

    Reply
    • Witold Jedrzejewski
      Witold Jedrzejewski says:

      The problem is that you are joining on the ‘id’ column.
      The df1 may be evenly distributed before join, but Spark has to partition it by the columns you are joining it on in order to do the join; in this case it will partition it on concat_ws(‘|’, df1.id, ‘engineer’).
      If you want to fight that skew, you need to join on something else: for example you can add random value to the join condition and then try to aggregate results.
      In general, there are many techniques to fight the skew – most of them include some kind of salting similar to the idea above.

      Reply
        • Witold Jedrzejewski
          Witold Jedrzejewski says:

          Well, you could try out the broadcasted join. In this approach the smaller dataframe is broadcasted to all the executors and the join is executed as a narrow dependency. This means there will be no shuffle at all. To do that one of the dataframes has to be small enough to fit in executor’s memory.
          This is implemented in spark-sql.

          Other techniques, like isolated join or salted join let you mitigate the skew during the shuffle phase. They are not implemented in spark and you have to implement them manually. This is a subject for a different blog-post, and it is covered pretty well all around the web 🙂 Just search for ‘skew’, ‘join’, ‘salt’, ‘isolated’ etc.

          Reply
      • Varma
        Varma says:

        Thanks for you reply. I just want to know, is there any way to avoid re-partitioning the dataframes at joining level such that the same partitioned data can be used in join.

        Reply
        • Witold Jedrzejewski
          Witold Jedrzejewski says:

          Hm, as far as I know not if the dataframe is partitioned by different set of keys than the one used in join condition. So not in the case you have described.
          But I encourage you to try out salting techniques or broadcast join (I mentioned it in one of my replies above).

          Reply
  7. Twinkal Khanna
    Twinkal Khanna says:

    I appreciate your work on Spark. It’s such a wonderful read on Spark tutorial. Keep sharing stuffs like this. I am also educating people on similar Spark Tutorial so if you are interested to know more you can watch this Spark training:-https://www.youtube.com/watch?v=dMDQz82FCqE

    Reply

Leave a Reply

Want to join the discussion?
Feel free to contribute!

Leave a Reply

Your email address will not be published. Required fields are marked *