
Как исправить перекосинг данных в Apache Spark с помощью техники поля
28 июня 2025 г.При работе с большими наборами данных вApache Spark, общая проблема с производительностьюДанные искажаютсяПолем Это происходит, когда несколько ключейдоминироватьраспределение данных, что приводит кнеровныйперегородки и медленные запросы. Это происходит в основном во время операций, которые требуютперетасовка, нравитьсяприсоединяетсяили даже обычныйагрегацииПолем
Практический способ уменьшения перекосапосол, который включает в себя искусственное распространение тяжелых ключей по нескольким перегородкам. В этом посте я проведу вам это с практическим примером.
Как солена решает проблемы с перекосом данных
ДобавивслучайноСгенерированное число к ключу соединения, а затем соединяя этот комбинированный ключ, мы можем более равномерно распределить большие клавиши. Это делает распределение данных более равномерным и распространяет нагрузку на большее количество работников, вместо того, чтобы отправлять большую часть данных одному работнику и оставлять остальных.
Преимущества соленника
Уменьшенный перекос:Равномерно распространяет данные по перегородкам, предотвращая перегрузку нескольких работников и улучшает использование.
Улучшенная производительность:Скорозащитные соединения и агрегации, балансируя рабочую нагрузку.
Избегает пребывания в ресурсах:Снижает риск ошибок вне памяти, вызванных большими неровными перегородками.
Когда использовать солинг
Во время соединений или агрегаций с искаженными ключами используйте солинг, когда вы заметите длительное время перетасовки или сбои исполнителя из -за искажения данных. Это также полезно в потоковых приложениях в реальном времени, где разделение влияет на эффективность обработки данных, или когда большинство работников простаивают, в то время как некоторые застряли в состоянии бега.
Пример солирования в скала
Давайте создадим некоторые данные снесбалансированКоличество рядов. Мы можем предположить, что есть два набора данных, которые мы должны присоединиться: один - большой набор данных, а другой - небольшой набор данных.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Simulated large dataset with skew
val largeDF = Seq(
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")
// Small dataset
val smallDF = Seq(
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")
Давайте добавим столбец для соленой в большие наборы данных, которые мы используемрандомизацияЧтобы получить значения большого ключа в более мелкие перегородки
// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
withColumn("salt", (rand() * numBuckets).cast("int")).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
| 1| txn1| 1| 1_1|
| 1| txn2| 1| 1_1|
| 1| txn3| 2| 1_2|
| 2| txn4| 2| 2_2|
| 3| txn5| 0| 3_0|
+-----------+-----------+----+------------------+
Чтобы убедиться, что мы рассмотрим все возможные рандомизированные соленые клавиши в больших наборах данных, нам нужновзорватьсяНебольшой набор данных со всеми возможными соленненными значениями
// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
crossJoin(smallDF).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id| name|salted_customer_id|
+----+-----------+------+------------------+
| 0| 1| Ahmed| 1_0|
| 1| 1| Ahmed| 1_1|
| 2| 1| Ahmed| 1_2|
| 0| 2| Ali| 2_0|
| 1| 2| Ali| 2_1|
| 2| 2| Ali| 2_2|
| 0| 3|Hassan| 3_0|
| 1| 3|Hassan| 3_1|
| 2| 3|Hassan| 3_2|
+----+-----------+------+------------------+
Теперь мы можем легко присоединиться к двум наборам данных
// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
select("customer_id", "transaction", "name")
joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction| name|
+-----------+-----------+------+
| 1| txn2| Ahmed|
| 1| txn1| Ahmed|
| 1| txn3| Ahmed|
| 2| txn4| Ali|
| 3| txn5|Hassan|
+-----------+-----------+------+
Пример поля в Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType
# Simulated large dataset with skew
largeDF = spark.createDataFrame([
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])
# Small dataset
smallDF = spark.createDataFrame([
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])
# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
saltedSmallDF,
on=["salted_customer_id", "customer_id"],
how="inner"
).select("customer_id", "transaction", "name")
Примечания
- Этот код использует
spark.range(...)
имитировать Скала(0 until numBuckets).toDF("salt")
Полем - Выражения столбцов обрабатываются с использованием
col(...)
Вlit(...)
, иconcat(...)
Полем - Актерский состав для целого числа использует
.cast(IntegerType())
Полем
Совет по настройке: выборnumBuckets
- Если вы установите
numBuckets = 100
, каждый ключ можно разделить на 100 подразделений. Тем не менее, будьте осторожны, потому что использование слишком большого количества ведер может снизить производительность, особенно для ключей с небольшими данными. Всегда тестируйте различные значения на основе перекоса профиля вашего набора данных. - Если вы знаете, как идентифицировать искаженные клавиши, то вы можете применить соленство только для этих клавиш и установить солирование для других клавиш как буквальные
0
, бывший.// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
Эмпирическое правило:Начните с малого (например, 10-20) и постепенно увеличиваются на основе наблюдаемых размеров перетасовки и времени выполнения задачи.
Последние мысли
Солить - это эффективный и простой метод управления перекосом в Apache Spark при традиционном разделении или подсказках (SKEWED JOIN
) недостаточны. При правильной настройке и мониторинге этот метод может значительно сократить время выполнения задания в высококачественных наборах данных.
Первоначально опубликовано в
Оригинал