Как исправить перекосинг данных в Apache Spark с помощью техники поля

Как исправить перекосинг данных в Apache Spark с помощью техники поля

28 июня 2025 г.

При работе с большими наборами данных вApache Spark, общая проблема с производительностьюДанные искажаютсяПолем Это происходит, когда несколько ключейдоминироватьраспределение данных, что приводит кнеровныйперегородки и медленные запросы. Это происходит в основном во время операций, которые требуютперетасовка, нравитьсяприсоединяетсяили даже обычныйагрегацииПолем

Практический способ уменьшения перекосапосол, который включает в себя искусственное распространение тяжелых ключей по нескольким перегородкам. В этом посте я проведу вам это с практическим примером.

Как солена решает проблемы с перекосом данных

ДобавивслучайноСгенерированное число к ключу соединения, а затем соединяя этот комбинированный ключ, мы можем более равномерно распределить большие клавиши. Это делает распределение данных более равномерным и распространяет нагрузку на большее количество работников, вместо того, чтобы отправлять большую часть данных одному работнику и оставлять остальных.

Преимущества соленника

  • Уменьшенный перекос:Равномерно распространяет данные по перегородкам, предотвращая перегрузку нескольких работников и улучшает использование.

  • Улучшенная производительность:Скорозащитные соединения и агрегации, балансируя рабочую нагрузку.

  • Избегает пребывания в ресурсах:Снижает риск ошибок вне памяти, вызванных большими неровными перегородками.

Когда использовать солинг

Во время соединений или агрегаций с искаженными ключами используйте солинг, когда вы заметите длительное время перетасовки или сбои исполнителя из -за искажения данных. Это также полезно в потоковых приложениях в реальном времени, где разделение влияет на эффективность обработки данных, или когда большинство работников простаивают, в то время как некоторые застряли в состоянии бега.

Пример солирования в скала

Давайте создадим некоторые данные снесбалансированКоличество рядов. Мы можем предположить, что есть два набора данных, которые мы должны присоединиться: один - большой набор данных, а другой - небольшой набор данных.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Simulated large dataset with skew
val largeDF = Seq(
  (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")

// Small dataset
val smallDF = Seq(
  (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")

Давайте добавим столбец для соленой в большие наборы данных, которые мы используемрандомизацияЧтобы получить значения большого ключа в более мелкие перегородки

// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
    withColumn("salt", (rand() * numBuckets).cast("int")).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
|          1|       txn1|   1|               1_1|
|          1|       txn2|   1|               1_1|
|          1|       txn3|   2|               1_2|
|          2|       txn4|   2|               2_2|
|          3|       txn5|   0|               3_0|
+-----------+-----------+----+------------------+

Чтобы убедиться, что мы рассмотрим все возможные рандомизированные соленые клавиши в больших наборах данных, нам нужновзорватьсяНебольшой набор данных со всеми возможными соленненными значениями

// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
    crossJoin(smallDF).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) 

saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id|  name|salted_customer_id|
+----+-----------+------+------------------+
|   0|          1| Ahmed|               1_0|
|   1|          1| Ahmed|               1_1|
|   2|          1| Ahmed|               1_2|
|   0|          2|   Ali|               2_0|
|   1|          2|   Ali|               2_1|
|   2|          2|   Ali|               2_2|
|   0|          3|Hassan|               3_0|
|   1|          3|Hassan|               3_1|
|   2|          3|Hassan|               3_2|
+----+-----------+------+------------------+

Теперь мы можем легко присоединиться к двум наборам данных

// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
    join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
    select("customer_id", "transaction", "name")

joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction|  name|
+-----------+-----------+------+
|          1|       txn2| Ahmed|
|          1|       txn1| Ahmed|
|          1|       txn3| Ahmed|
|          2|       txn4|   Ali|
|          3|       txn5|Hassan|
+-----------+-----------+------+

Пример поля в Python

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType

# Simulated large dataset with skew
largeDF = spark.createDataFrame([
    (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])

# Small dataset
smallDF = spark.createDataFrame([
    (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])

# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
    saltedSmallDF,
    on=["salted_customer_id", "customer_id"],
    how="inner"
).select("customer_id", "transaction", "name")

Примечания

  • Этот код используетspark.range(...)имитировать Скала(0 until numBuckets).toDF("salt")Полем
  • Выражения столбцов обрабатываются с использованиемcol(...)Вlit(...), иconcat(...)Полем
  • Актерский состав для целого числа использует.cast(IntegerType())Полем

Совет по настройке: выборnumBuckets

  • Если вы установитеnumBuckets = 100, каждый ключ можно разделить на 100 подразделений. Тем не менее, будьте осторожны, потому что использование слишком большого количества ведер может снизить производительность, особенно для ключей с небольшими данными. Всегда тестируйте различные значения на основе перекоса профиля вашего набора данных.
  • Если вы знаете, как идентифицировать искаженные клавиши, то вы можете применить соленство только для этих клавиш и установить солирование для других клавиш как буквальные0, бывший.
    •     // Step 1: create a salting key in the large dataset
              val numBuckets = 3
              val saltedLargeDF = largeDF.
                  withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
        
          // Step 2: Explode rows in smallDF for possible salted keys
              val saltedSmallDF = (0 until numBuckets).toDF("salt").
                  crossJoin(smallDF.filter($"customer_id" === 1)).
                  select("customer_id", "salt", "name").
                  union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
      

Эмпирическое правило:Начните с малого (например, 10-20) и постепенно увеличиваются на основе наблюдаемых размеров перетасовки и времени выполнения задачи.


Последние мысли

Солить - это эффективный и простой метод управления перекосом в Apache Spark при традиционном разделении или подсказках (SKEWED JOIN) недостаточны. При правильной настройке и мониторинге этот метод может значительно сократить время выполнения задания в высококачественных наборах данных.

Первоначально опубликовано вhttps://practical-software.com11 мая 2025 года.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE