Site icon Blog ARC Optimizer

PySpark – Normes de codage et bonnes pratiques


Objectif:

L’objectif principal de ce document est de sensibiliser et d’établir une compréhension claire des normes de codage et des meilleures pratiques à respecter lors du développement de composants PySpark. Les meilleures pratiques sont toute procédure acceptée comme étant la plus efficace, soit par consensus, soit par prescription. Les pratiques peuvent aller des méthodologies stylistiques aux méthodologies de conception approfondies.

Dans une tentative de documenter les meilleures pratiques pour Databricks, il devient essentiel de couvrir certaines des meilleures pratiques Python clés. Alors, commençons ici par les normes générales de Python et approfondissons éventuellement PySpark.

Disposition du code :

Utilisez 4 espaces pour chaque niveau d’indentation. Les lignes de continuation doivent aligner les éléments enveloppés verticalement comme un retrait suspendu. Lors de l’utilisation d’un retrait suspendu, il ne doit y avoir aucun argument dans la première ligne et une indentation supplémentaire doit être utilisée pour le distinguer clairement en tant que ligne de continuation.

# CORRECT:

# Aligned with opening delimiter
foo = function_name(var_one, var_two,
                    var_three, var_four)

# Add 4 spaces and an extra level of indentation to distinguish the arguments with the rest
def function_name(
        var_one, var_two,
        var_three, var_four):
    print(var_one)

# Hanging indent should add a level
foo = function_name(
    var_one, var_two,
    var_three, var_four)

# WRONG:

# Arguments on first line forbidden when not using vertical alignment.
foo = long_function_name(var_one, var_two,
    var_three, var_four)

# Further indentation required as indentation is not distinguishable.
def long_function_name(
    var_one, var_two, var_three,
    var_four):
    print(var_one)
  • Ajout de deux lignes vides avant les fonctions et les classes
    • La fonction de niveau supérieur et les classes sont séparées par deux lignes vides
    • Les définitions de méthode à l’intérieur de la classe doivent être séparées par une ligne vide
    • Des lignes vierges supplémentaires peuvent être utilisées avec parcimonie pour séparer un groupe de fonctions connexes
    • Utilisez les lignes vides dans les fonctions avec parcimonie, pour indiquer les sections logiques
# Top level function and class are separated with two blank lines
# Function definitions within a class are separated with single blank line
class MyParentClass:

    def function_one(var_one, var_two):
        print(var_one)
        print(var_two)

    def function_two(var_one, var_two):
        print(var_one + var_two)
  • Limitation des longueurs de ligne
    • Limitez toutes les lignes à un maximum de 79 caractères
    • Pour les longs blocs de texte fluides avec moins de restrictions structurelles (docstrings ou commentaires), la longueur de la ligne doit être limitée à 72 caractères
  • Importations
    • Les importations doivent généralement figurer sur des lignes distinctes
    • Les importations de caractères génériques doivent être évitées, car elles ne donneront pas une image claire des noms présents dans l’espace de noms
# Correct:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import sys
import os

# Wrong:
import sys, os
from pyspark.sql import *

Conventions de nommage

  • N’utilisez pas ‘l’, ‘O’, ‘I’ comme nom de variable unique. Comme tous ces caractères ressemblent aux chiffres 1 et 0 dans certaines polices, cela peut prêter à confusion.
  • Généralement, il est recommandé d’utiliser des noms courts. Dans certains cas, des traits de soulignement peuvent être utilisés pour une meilleure lisibilité
  • Préfixer un seul trait de soulignement (_) prend en charge la protection des variables de module
  • Préfixer un double trait de soulignement (__) à une instance ou à une méthode la rend privée à sa classe
Taper Convention de dénomination Exemple
Fonction Utiliser des variables en minuscules séparées par des traits de soulignement mafonction, ma_fonction
Variable Utilisez des lettres minuscules ou un mot, ou un mot séparé par des traits de soulignement x, ma_variable
Classer Utilisez la casse Pascal. Gardez la première lettre de chaque phrase en majuscule. Ne séparez pas les mots par des traits de soulignement MyClass, ProcessCustomerData
Méthode Utilisez des mots en minuscules séparés par des traits de soulignement get_customer_data
Constant Utilisez une lettre majuscule, un mot ou des mots séparés par un trait de soulignement X, LOCATION, LOCATION_COUNT
Module Utilisez des mots courts en minuscules séparés par des traits de soulignement support_module.py
Forfait Utilisez des mots courts en minuscules sans traits de soulignement module de support, monpaquet

Recommandations générales

  • Comparer des singletons
    • Utilisez ‘est’ lors de la comparaison de singletons
    • Utilisez ‘n’est pas’ au lieu de ‘n’est pas… est’
# Correct:
if foo is not None:
    do_something()

# Wrong:
if foo == None:
    do_something()

# Wrong:
if not foo is None:
    do_something()

Utilisez toujours l’instruction ‘def’ au lieu d’une instruction d’affectation pour les expressions anonymes (lambda).

# Correct:
def multiply(x):
    return 2*x

# Wrong:
f = lambda x: 2*x
  • Dériver des exceptions
    • Dériver les exceptions de ‘Exception’ au lieu de ‘BaseException’
    • Utilisez autant que possible la capture d’exception explicite. Évitez la capture d’exception implicite.
    • Gardez la logique de la section ‘essayer’ aussi simple que possible
# Correct
try:
    import platform_specific_module

except ImportError:
    platform_specific_module = None

else:
    do_something()


# Wrong:
try:
    import platform_specific_module
    do_something()
except ImportError:
    platform_specific_module = None
  • Comparaison booléenne
    • Les booléens sont déjà des booléens. Ils n’ont pas besoin de comparaisons.
    • Pour les séquences (par exemple, les listes), utilisez le fait que les séquences vides représentent de faux
# Correct:
if is_active_customer:
    do_something()

# Wrong:
if is_active_customer == True:
    do_something()

# Wrong:
if is_active_customer is True:
    do_something()

# Wrong: If the list is empty, it represents FALSE. So, no need to check the length of the list
if len(customer_list) != 0:
    do_something()

Databricks – Bonnes pratiques

  • Évitez les déclarations imprimées. Utiliser le module de journalisation.
  • Garantir la réutilisabilité des modules de code dans l’ensemble du fichier. Utilisez des composants réutilisables existants au lieu de créer de nouvelles fonctions de manière redondante.
  • Lors de l’utilisation de la fonction récursive avec Spark, assurez-vous qu’elle dispose de l’instruction break appropriée. Sinon, cela entraînera une surutilisation des ressources
  • Les noms d’utilisateur, les mots de passe et les noms d’hôte ne doivent pas être conservés dans un fichier python direct ou un bloc-notes. Les informations sensibles doivent être gérées dans un coffre-fort sécurisé et référencées dans un fichier python ou un cahier à l’aide de clés.
  • Lors de l’utilisation d’instructions SQL, il est recommandé d’affecter l’instruction SQL à une variable. Et utilisez la variable dans l’API Spark SQL.
# Correct:
sql_query = 'SELECT col_1, col_2, col_3 FROM table'
df_data = spark.sql(sql_query)

# Bad:
df_data = spark.sql('SELECT col_1, col_2, col_3 FROM table')
  • Refactoriser l’enchaînement complexe d’expressions. Il est recommandé d’appliquer des expressions multilignes avec différents types, surtout si elles ont des comportements et des contextes différents. Par exemple, mélangez la création ou la jonction de colonnes avec la sélection et le filtrage.
# Bad:
df = (
    df
    .select('a', 'b', 'c', 'key')
    .filter(F.col('a') == 'truthiness')
    .withColumn('boverc', F.col('b') / F.col('c'))
    .join(df2, 'key', how='inner')
    .join(df3, 'key', how='left')
    .drop('c')
)

# Better (separating into steps):
# Step 1: we select and trim down the data that we need
# Step 2: we create the columns that we need to have
# Step 3: joining with other dataframes
df = (
    df
    .select('a', 'b', 'c', 'key')
    .filter(F.col('a') == 'truthiness')
)
df = df.withColumn('boverc', F.col('b') / F.col('c'))

df = (
    df
    .join(df2, 'key', how='inner')
    .join(df3, 'key', how='left')
    .drop('c')
)
  • Utilisez l’instruction select pour spécifier un contrat de schéma. Faire une sélection au début de la transformation, ou avant de revenir, est considéré comme une bonne pratique. Toute sélection doit être considérée comme une opération de nettoyage qui prépare la trame de données pour la consommation par l’étape suivante de la transformation.

Gardez les instructions de sélection aussi simples que possible. En raison des idiomes SQL courants, autorisez uniquement une fonction de spark.sql.function à utiliser par colonne sélectionnée, plus un .alias() facultatif pour lui donner un nom significatif.

Les expressions impliquant plus d’une trame de données ou les opérations conditionnelles telles que .when() ne sont pas recommandées dans une sélection, sauf si cela est nécessaire pour des raisons de performances.

# Good:   
aircraft = aircraft.select(
    'aircraft_id',
    'aircraft_msn',
    'aircraft_type',
    'operator_code',
    F.col('aircraft_registration').alias('registration'),
    F.col('number_of_economy_seats').cast('long'),
    F.col('number_of_business_seats').cast('long'),
    F.avg('staleness').alias('avg_staleness'),
    F.avg('flight_hours').alias('avg_flight_hours'),
)

# Bad:
aircraft = aircraft.select(
    'aircraft_id',
    'aircraft_msn',
    F.col('aircraft_registration').alias('registration'),
    'aircraft_type',
    F.avg('staleness').alias('avg_staleness'),
    F.col('number_of_economy_seats').cast('long'),
    F.avg('flight_hours').alias('avg_flight_hours'),
    'operator_code',
    F.col('number_of_business_seats').cast('long'),
)
  • Au lieu d’utiliser withColumnRenamed(), utilisez des alias. De plus, au lieu d’utiliser withColumn() pour redéfinir le type, lancez-le dans le select.
# Good:
df.select('key', F.col('comments').alias('num_comments'))

# Good:
df.select(F.col('comments').cast('double'))

# Bad:
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')

# Bad:
df.select('comments').withColumn('comments', F.col('comments').cast('double'))
  • Aux endroits où une colonne vide doit être ajoutée pour satisfaire le schéma, utilisez toujours la fonction F.lit(None) pour remplir la colonne vide. N’utilisez jamais une chaîne vide ou une autre valeur représentant un vide comme « NA », « N/A », « Nil ».

Bien qu’il soit sémantiquement correct de l’utiliser, la principale raison de recommander d’utiliser F.lit(None) est de préserver la possibilité d’utiliser des utilitaires comme isNull, au lieu de vérifier une chaîne vide, « NA », « N/A »,  » Néant ».

# Good:
df = df.withColumn('foo', F.lit(None))

# Bad:
df = df.withColumn('foo', F.lit(''))

# Bad:
df = df.withColumn('foo', F.lit('NA'))
  • Fonctions définies par l’utilisateur (UDF)

Il est fortement recommandé d’éviter les UDF dans toutes les situations, car elles sont moins performantes que PySpark natif. Dans la plupart des cas, la logique qui nécessite une UDF peut être refactorisée pour obtenir de l’UDF et utiliser uniquement l’API PySpark native.

Il est recommandé d’être plus prudent lors de l’utilisation de jointures. Par exemple, lorsque nous effectuons une jointure à gauche et que la table de droite a plusieurs correspondances pour une clé, la ligne sera dupliquée autant de fois qu’il y a de correspondances. Cela aura un impact important sur la sortie du travail de transformation.

Spécifiez toujours explicitement le mot-clé how, même si vous effectuez une jointure interne par défaut.

# Good:
telemetry = telemetry.join(sensor_data, 'vehicle_id', how='inner')

# Bad:
telemetry = telemetry.join(sensor_data, 'vehicle_id')

# Bad:
telemetry = telemetry.join(sensor_data, 'vehicle_id', 'inner')

Évitez d’utiliser la jointure à droite. Si vous êtes sur le point d’utiliser une jointure à droite, modifiez l’ordre des dataframes de manière à utiliser une jointure à gauche à la place. C’est plus intuitif puisque la base de données sur laquelle vous effectuez l’opération est celle autour de laquelle vous effectuez votre jointure.

# Good:
telemetry = telemetry.join(sensors, on='vehicle_id', how='left')

# Bad:
sensors = sensors.join(telemetry, on='vehicle_id', how='right')
  • Table/dataframe de cache pour les tables réutilisables

cache() est une transformation Apache Spark qui peut être utilisée sur RDD, Dataframe ou Dataset lorsque vous effectuez plusieurs opérations avec cette entité (RDD/Dataframe/Dataset). Comme cache() est une opération de transformation, l’opération de mise en cache n’a lieu que lorsqu’une action Spark (compter, afficher, prendre ou écrire) est également effectuée sur la même trame de données, ensemble de données ou RDD en une seule action,

df1 = spark.read.csv(input_path_1)
df2 = spark.read.csv(input_path_2)
df1.cache()                                                 # Transformation - Cache Dataframe df1

joined_df = df1.join(df2, df1.id==df2.id, how='inner')      # Join Dataframe df1 & df2
filtered_df = joined_df.filter("id == 'ID100'")             # Filter the joined Dataframe df1 for id 'ID100'
df1.count()                                                 # Call count on the cached Dataframe df1
filtered_df.show()                                          # Show data out of filtered Dataframe filtered_df

Dans l’extrait ci-dessus, Dataframe df1 sera mis en cache dans la mémoire uniquement lorsque l’action df1.count() est exécutée. df1.cache() ne lance pas l’opération de mise en cache sur Dataframe df1.

df = spark.read.csv(input_file_path)
df.cache.take(10)                       # Calling take(10) on the dataframe, while caching it
df.count()                              # Call count() on the cached dataframe df

Dans l’extrait ci-dessus, Dataframe df sera mis en cache dans la mémoire lorsque l’action take(10) sera exécutée. Cependant, il y a un hic dans cette action qu’une seule partition sera mise en cache. C’est-à-dire que take(10) ne traite que 10 enregistrements et que la partition associée à ces 10 enregistrements ne sera mise en cache et les autres partitions ne seront pas mises en cache. En conséquence, la prochaine instruction df.count() créera à nouveau le dataframe df. Plutôt df.cache.count() mettra en cache les enregistrements de toutes les partitions. Par conséquent, il est recommandé d’utiliser df.cache.count() partout où les cas d’utilisation nécessitent de mettre en cache toutes les données.

# Recommended
df = spark.table('input_table_data')
df.cache.count()            # cache dataframe df
df.count()                  # call count return result from the cached dataframe

La comparaison de chaînes Databricks est sensible à la casse et ne peut pas comparer des chaînes avec une casse différente.

sql_query = "SELECT 'DAGSequence'='dagsequence' AS WithoutLowerUpper, \
    LOWER('DAGSequence')='dagsequence' AS WithLowerCase, \
    UPPER('DAGSequence')='DAGSEQUENCE' AS WithUpperCase"
df = spark.sql(sql_query)
df.show()

Résultat:

+———————+—————-+—————-+

|SansLowerUpper|AvecLowerCase|AvecUpperCase|

+———————+—————-+—————-+

| faux | vrai | vrai |

+———————+—————-+—————-+

Les tables delta dans Databricks prennent en charge le partitionnement qui améliore les performances. Vous pouvez partitionner par colonne si vous vous attendez à ce que les données de cette partition fassent au moins 1 Go. Si la cardinalité de la colonne est élevée, n’utilisez pas cette colonne pour le partitionnement. Par exemple, si vous partitionnez par ID utilisateur et qu’il existe 1 million d’ID utilisateur distincts, le partitionnement augmenterait le temps de chargement de la table. Exemple:

CREATE TABLE weather(
    weather_date DATE,
    location STRING,
    location_type STRING,
    temperature DOUBLE
) USING delta PARTITIONED BY (location)

(or)

CREATE TABLE weather(
    location_type STRING,
    temperature DOUBLE
) PARTITIONED BY (weather_date DATE, 
                  location STRING)
  • Performances de Delta Lake en utilisant OPTIMIZE avec ZORDER

Le Z-Ordering est une approche permettant de regrouper des informations connexes dans le même ensemble de fichiers. La technique de co-localité est automatiquement appliquée par des algorithmes de saut de données dans Delta Lake sur Databricks, pour réduire considérablement la quantité de données à lire. Pour les données Z-Order, spécifiez les colonnes à trier dans la clause ZORDER BY.

OPTIMIZE events
WHERE date > current_timestamp() - INTERVAL 1 day
ZORDER BY (event_type)
  • Utilisation des conseils de répartition pour équilibrer les partitions

Vous trouverez ci-dessous les différents types d’indices de partitionnement,

SE FONDRE Réduisez le nombre de partitions au nombre de partitions spécifié. Il prend un numéro de partition en paramètre
REPARTITION Répartitionner le nombre spécifié de partitions à l’aide des expressions de partitionnement spécifiées. Il prend le numéro de partition, les noms de colonne ou les deux comme paramètres
REPARTITION_BY_RANGE Répartitionner le nombre spécifié de partitions à l’aide des expressions de partitionnement spécifiées. Il prend les noms de colonne et un numéro de partition facultatif comme paramètres
RÉÉQUILIBRER L’indicateur REBALANCE peut être utilisé pour rééquilibrer les partitions de sortie des résultats de la requête, de sorte que chaque partition ait une taille raisonnable. Il peut prendre des noms de colonnes comme paramètres. Cette astuce sera utile lorsque nous aurons besoin d’écrire le résultat de la requête dans une table, pour éviter des fichiers trop petits/gros. Cet indice est ignoré si AQE (Adaptive Query Execution) n’est pas activé.
  • Supprimer la table temporaire après l’exécution du notebook

Supprimez les tables temporaires qui ont été créées en tant que tables intermédiaires lors de l’exécution du bloc-notes. La suppression de tableaux permet d’économiser de l’espace de stockage, en particulier si le bloc-notes est planifié quotidiennement.

spark.catalog.dropTempView('temp_view_name')

spark.sql('drop view temp_view_name')
  • Utiliser des vues lors de la création de tables intermédiaires

Si nous devons créer des tables intermédiaires, utilisez des vues pour minimiser l’utilisation du stockage et réduire les coûts. Les vues sont orientées session et supprimeront automatiquement les tables du stockage après l’exécution de la requête. Pour des performances de requête optimales, n’utilisez pas de jointures ou de sous-requêtes dans les vues

df.createOrReplaceTempView('tmp_research_data')         # this view will be available until active user session

df.createOrReplaceGlobalTempView('gv_research_data')    # this view will be available until active spark session (cluster)






Source link
Quitter la version mobile