SALTING & Broadcast Join in Spark

SALTING & Broadcast Join in Spark

Published: None

Source: https://www.linkedin.com/pulse/salting-broadcast-join-spark-arabinda-mohapatra-okhvc?trackingId=XFuzrPv8SFqHvSecoJC58A%3D%3D


SALTING & Broadcast Join in Spark

Running Kafka streams after dark, diving into genetic code by daylight, and wrestling with Databricks and Tableflow in every spare moment—sleep is optional


  • Salting: Use when you have skewed data and need to balance the load across partitions.
  • Broadcast Hash Join: Use when one of the datasets is small enough to be broadcasted, typically less than the spark.sql.autoBroadcastJoinThreshold (default is 10MB).

Broadcast Hash Join

Data Skew can be addressed the approach to resolving a data skew is by using Broadcast Hash Join. If one of the tables is small enough to fit the memory of all the nodes, broadcast hash join is an excellent option to resolve data skew issues. The reason it works is that this type of join completely avoids a shuffle. Since the data is not re-partitioned based on the skewed values, there is no uneven distribution of data.

INVOICE table is “broadcasted” to all the nodes. There is no re-partitioning of data on the basis of the join column (invoice_id) as each node has a complete copy of the invoice table.


Article content
Broadcast Join

.# READ

event = spark.read.parquet('...').select(['event_id', 'invoice_id', 'session'])

invoice = spark.read.parquet('...').select(['invoice_id', 'sale_amt'])

# BROADCAST

join_condition = [event.invoice_id == invoice.invoice_id]

df_joined = event.join(broadcast(invoice), join_condition, 'left')


Key Salting

Sometimes, it might be the case that you cannot avoid the join and both the tables are too large to fit in memory. In such a scenario, one approach to avoiding a data skew is to “salt” your datasets and then include the salt in the join condition.


Article content
Salting

Key salting is a technique used to optimize joins in distributed data systems by reducing data skew and improving performance. The process involves the following steps:

  1. Selecting a Range of Salt Values: A range of salt values is chosen. Larger salt ranges allow for better data distribution but also increase the size of the dataset after salting.
  2. Exploding the Lookup Table: The lookup table is expanded by generating multiple records for each original record, corresponding to the number of salt values. For example, if there are 5 salt values, each original record is duplicated 5 times with each copy having a different salt value.
  3. Distributing Salt in the Base Table: A salt value is added to the base table's records, ensuring that the salt values are evenly distributed.
  4. Performing the Join: The join is executed on both the join key and the salt value. This forces the data to be more evenly distributed across partitions, reducing data skew and improving the efficiency of the join operation.

# READ

event = spark.read.parquet('...').select(['event_id', 'invoice_id', 'session'])

invoice = spark.read.parquet('...').select(['invoice_id', 'sale_amt']) # Salt Range

# this specifies how much the data will be distributed.

# While greater number of salts increases the distribution, it also increases the explosion of lookup table

# Hence a balance is necessary.

n = 5

salt_values = list(range(n)) # i.e salt values = [0, 1, ..... 4] # explode the lookup dataframe with salt

# This will generate n records for each record where n = len(salt_values)

invoice = invoice.withColumn("salt_values", array([lit(i) for i in salt_values])) invoice = invoice.withColumn("_salt_", explode(invoice.salt_values)).drop("salt_values") # distribute salt evently in the base table event = event.withColumn("_salt_", monotonically_increasing_id() % n) # SaltedJoin join_condition = [event.invoice_id == invoice.invoice_id, event._salt_ == invoice._salt_] df_joined = event.join(invoice, join_condition, 'left').drop("_salt_")


Will add more points to it further




Comments