Fermer

mars 6, 2024

Propriétés Spark SQL / Blogs / Perficient

Propriétés Spark SQL / Blogs / Perficient


Les propriétés spark.sql.* sont un ensemble d’options de configuration spécifiques à Spark SQL, un module d’Apache Spark conçu pour traiter des données structurées à l’aide de requêtes SQL, de l’API DataFrame et d’ensembles de données. Ces propriétés permettent aux utilisateurs de personnaliser divers aspects du comportement, des stratégies d’optimisation et de l’environnement d’exécution de Spark SQL. Voici une brève introduction à certaines propriétés courantes de spark.sql.* :

spark.sql.shuffle.partitions

La propriété spark.sql.shuffle.partitions dans Apache Spark détermine le nombre de partitions à utiliser lors du brassage des données lors d’opérations telles que des jointures ou des agrégations dans Spark SQL. La lecture aléatoire implique la redistribution et le regroupement des données entre partitions en fonction de certains critères, et le nombre de partitions affecte directement le parallélisme et l’utilisation des ressources au cours de ces opérations. Le comportement par défaut divise les DataFrames en 200 partitions uniques lors du brassage des données.

Syntaxe:

// Setting the number of shuffle partitions to 200
spark.conf.set("spark.sql.shuffle.partitions", "200")

spark.sql.autoBroadcastJoinThreshold

La propriété spark.sql.autoBroadcastJoinThreshold dans Apache Spark SQL détermine la taille du seuil au-delà de laquelle Spark SQL diffuse automatiquement des tables plus petites pour les opérations de jointure. La diffusion implique la réplication d’un DataFrame ou d’une table plus petite sur tous les nœuds exécuteurs pour éviter un brassage coûteux lors des opérations de jointure.

Syntaxe:

// Setting the autoBroadcastJoinThreshold to 10MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")

spark.sql.execution.arrow.enabled

Dans Apache Spark SQL, le spark.sql.execution.arrow.enabled La propriété détermine si les transferts de données en colonnes basés sur des flèches sont activés pour les opérations DataFrame. Arrow est un format de données en mémoire en colonnes qui peut améliorer considérablement les performances de sérialisation et de désérialisation des données, conduisant à un traitement des données plus rapide.

Syntaxe:

// Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

spark.sql.sources.partitionOverwriteMode

La propriété spark.sql.sources.partitionOverwriteMode dans Apache Spark SQL détermine le mode d’écrasement des partitions lors de l’écriture de données dans des tables partitionnées. Cette propriété est particulièrement pertinente lors de la mise à jour de données existantes dans des tables partitionnées, car elle spécifie comment Spark doit gérer l’écrasement des répertoires de partition. Par défaut, partitionOverwriteMode sera Statique.

Syntaxe:

// Setting the partition overwrite mode to "dynamic"
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

spark.sql.statistiques.histogram.enabled

La propriété spark.sql.statistics.histogram.enabled dans Apache Spark SQL détermine si Spark SQL collecte des histogrammes pour le calcul des statistiques de données. Les histogrammes fournissent des informations supplémentaires sur la répartition des données dans les colonnes, ce qui peut aider l’optimiseur de requêtes à prendre de meilleures décisions d’exécution. Par défaut, la configuration est définie sur false.

Syntaxe:

// Enable collection of histograms for data statistics computation
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")

spark.sql.streaming.schemaInference

La propriété spark.sql.streaming.schemaInference dans Apache Spark SQL détermine si l’inférence de schéma est activée pour le streaming de DataFrames. Lorsqu’il est activé, Spark SQL déduit automatiquement le schéma des sources de données de streaming pendant l’exécution, simplifiant ainsi le processus de développement en éliminant le besoin de spécifier manuellement le schéma.

Syntaxe:

// Enable schema inference for streaming DataFrames
spark.conf.set("spark.sql.streaming.schemaInference", "true")

spark.sql.adaptive.skewJoin.enabled

La propriété spark.sql.adaptive.skewJoin.enabled dans Apache Spark SQL détermine si l’exécution de requêtes adaptatives est activée pour l’optimisation des jointures asymétriques. Lorsqu’il est activé, Spark SQL détecte et atténue automatiquement l’asymétrie des données dans les opérations de jointure en ajustant dynamiquement la stratégie de jointure pour gérer plus efficacement les distributions de données asymétriques. Par défaut, la jointure asymétrique est True.

Syntaxe:

// Enable adaptive query execution for skew join optimization
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

spark.sql.inMemoryColumnarStorage.batchSize

La propriété spark.sql.inMemoryColumnarStorage.batchSize dans Apache Spark SQL configure la taille du lot pour la mise en cache en colonnes. Cette propriété définit le nombre de lignes traitées et stockées ensemble en mémoire lors des opérations de mise en cache en colonnes. Par défaut, la taille du lot est de 10 000.

Syntaxe:

// Setting the batch size for columnar caching to 1000 rows
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")

spark.sql.adaptive.coalescePartitions.enabled

La propriété spark.sql.adaptive.coalescePartitions.enabled dans Apache Spark SQL détermine si la fusion adaptative de partitions est activée. Lorsqu’il est activé, Spark SQL ajuste dynamiquement le nombre de partitions pendant l’exécution de la requête pour optimiser l’utilisation des ressources et améliorer les performances.

Syntaxe:

// Enable adaptive partition coalescing
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Exemple

Voici un exemple illustrant l’utilisation de toutes les propriétés Spark SQL mentionnées avec une requête SQL :

// Importing necessary Spark classes
import org.apache.spark.sql.{SparkSession, DataFrame}

// Setting Spark SQL properties
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") // 10 MB
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
spark.conf.set("spark.sql.streaming.schemaInference", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

// Creating DataFrames for the tables
val employeesData = Seq((1, "Aarthii", 1000), (2, "Gowtham", 1500), (3, "Saranya", 1200))
val departmentsData = Seq((1000, "HR"), (1200, "Engineering"), (1500, "Finance"))
val employeesDF = spark.createDataFrame(employeesData).toDF("emp_id", "emp_name", "dept_id")
val departmentsDF = spark.createDataFrame(departmentsData).toDF("dept_id", "dept_name")

// Registering DataFrames as temporary views
employeesDF.createOrReplaceTempView("employees")
departmentsDF.createOrReplaceTempView("departments")

// Executing a SQL query using the configured properties
val result = spark.sql(
"SELECT emp_name, dept_name FROM employees e JOIN departments d ON e.dept_id = d.dept_id"
)

// Showing the result
result.show()

SORTIR:

Propriétés Spark SQL

Dans cet exemple :

  • Nous importons les classes Spark nécessaires, notamment SparkSession et DataFrame.
  • Nous créons un objet SparkSession nommé spark.
  • Nous définissons diverses propriétés Spark SQL à l’aide de la méthode spark.conf.set().
  • Nous créons des DataFrames pour deux tables : « employés » et « départements ».
  • Nous enregistrons les DataFrames en tant que vues temporaires en utilisant createOrReplaceTempView().
  • Nous exécutons une requête de jointure SQL entre les tables « employés » et « départements » en utilisant spark.sql().
  • Enfin, nous affichons le résultat en utilisant show().

Ces propriétés fournissent un contrôle précis sur le comportement et les techniques d’optimisation de Spark SQL, permettant aux utilisateurs d’adapter les performances et les fonctionnalités des applications Spark SQL à des exigences et des cas d’utilisation spécifiques.

Référence: https://spark.apache.org/docs/latest/configuration.html






Source link