Fermer

février 16, 2024

DBFS (système de fichiers Databricks) dans Apache Spark / Blogs / Perficient

DBFS (système de fichiers Databricks) dans Apache Spark / Blogs / Perficient


Dans le monde du traitement du Big Data, des systèmes de fichiers efficaces et évolutifs jouent un rôle crucial. L’un de ces systèmes de fichiers qui a gagné en popularité dans l’écosystème Apache Spark est DBFS, qui signifie Databricks File System. Dans cet article de blog, nous explorerons ce qu’est DBFS, comment il fonctionne et fournirons des exemples pour illustrer son utilisation.

Qu’est-ce que DBFS ?

DBFS est un système de fichiers distribué intégré à Databricks, une plateforme d’analyse unifiée conçue pour simplifier les tâches de traitement du Big Data et d’apprentissage automatique. Il s’appuie sur des systèmes de fichiers existants tels qu’Amazon S3, Azure Blob Storage et Hadoop HDFS, fournissant une couche d’abstraction et des fonctionnalités supplémentaires pour les applications Spark.

Comment fonctionne DBFS ?

DBFS fournit une interface unifiée pour accéder aux données stockées dans divers systèmes de stockage sous-jacents. Il utilise un concept appelé « points de montage » pour intégrer de manière transparente différents services de stockage dans une seule hiérarchie de système de fichiers. Cela permet aux utilisateurs d’interagir avec les données à l’aide de commandes de système de fichiers standard et d’API Spark sans se soucier du backend de stockage spécifique.

Exemples d’utilisation de DBFS :

Montage du stockage de données :

# Mounting Amazon S3 Bucket
dbutils.fs.mount(
  source = "s3a://your-s3-bucket/",
  mount_point = "/mnt/s3",
  extra_configs = {"<your-config-key>": "<your-config-value>"}
)

La fonction dbutils.fs.mount() est une fonction utilitaire Databricks que les utilisateurs utilisent pour monter des systèmes de stockage externes tels qu’Amazon S3, Azure Blob Storage, Google Cloud Storage, etc., sur DBFS. Le montage d’un système de stockage vous permet d’accéder aux fichiers de ce système de stockage à l’aide de chemins de fichiers DBFS standard.

Lecture des données :

# Reading data from DBFS
val data_df = spark.read.csv("dbfs:/FileStore/tables/Largest_earthquakes_by_year.csv")

Le code lira le fichier CSV spécifié dans un DataFrame nommé data_df, permettant un traitement et une analyse ultérieurs à l’aide de l’API DataFrame de Spark.

Sortir:

Lisez un CSV à l'aide de DBFS.

Écriture de données :

# Writing data to DBFS
data_df.write.parquet("dbfs:/FileStore/tables/earthquake_list.parquet")

Le code écrira le contenu du DataFrame data_df dans un fichier Parquet à l’emplacement spécifié. Les fichiers Parquet sont bien adaptés aux applications Spark car ils offrent un stockage efficace et des performances optimisées pour le traitement des données en colonnes.

Sortir:

CSV dans Parquet

Gestion des fichiers et des répertoires :

  • Lister les fichiers dans un répertoire
# List files in a directory
val files = dbutils.fs.ls("dbfs:/FileStore/tables")

Les fichiers variables contiendront une liste d’objets de métadonnées de fichier, où chaque objet représente un fichier ou un répertoire dans le chemin de répertoire spécifié. Les métadonnées du fichier incluent généralement des informations telles que le nom du fichier, sa taille, l’heure de modification et les autorisations. Cette liste peut être traitée ou analysée davantage si nécessaire dans votre bloc-notes ou application Databricks.

Sortir:

Liste des fichiers

# Create a directory
dbutils.fs.mkdirs("dbfs:/FileStore/tables/new_directory")

Un nouveau répertoire nommé new_directory sera créé dans le dbfs monté. Si le répertoire existe déjà, cette fonction ne générera pas d’erreur ; il reviendra simplement sans apporter aucune modification. Cette fonctionnalité vous permet de créer des répertoires en toute sécurité sans craindre d’écraser par inadvertance des répertoires existants.

Sortir:

Nouveau répertoire

# Delete a file
dbutils.fs.rm("dbfs:/FileStore/tables/Largest_earthquakes_by_year-2.csv")

Le fichier nommé file.csv situé au chemin spécifié sera supprimé du dbfs monté. Si le fichier n’existe pas au chemin spécifié, cette fonction ne générera pas d’erreur ; il reviendra simplement sans apporter aucune modification. Ce comportement vous permet de tenter en toute sécurité de supprimer des fichiers sans vous soucier des erreurs si le fichier n’existe pas.

# Delete a file 
dbutils.fs.rm("dbfs:/FileStore/tables/Largest_earthquakes_by_year-2.csv", "true")

Après avoir exécuté ce code, le fichier « dbfs:/FileStore/tables/Largest_earthquakes_by_year-2.csv » sera supprimé du système de fichiers Databricks. Si le fichier n’existe pas au chemin spécifié, l’appel de fonction sera renvoyé sans apporter de modifications.

#Delete a folder
dbutils.fs.rm("dbfs:/FileStore/tables/new",true)

Lorsqu’il est réglé sur true, cela signifie que si le chemin spécifié fait référence à un répertoire, la fonction supprimera récursivement le répertoire et tout son contenu. Si réglé sur falseil supprimera uniquement le fichier spécifié ou le répertoire vide sans récursion.

  • Copier un fichier d’un emplacement à un autre
#Copy a file from one location to another in DBFS
dbutils.fs.cp("dbfs:/FileStore/tables/Largest_earthquakes_by_year.csv", "dbfs:/FileStore/tables/new/Largest_earthquakes_by_year.csv")

Copie le fichier situé dans dbfs:/FileStore/tables/Largest_earthquakes_by_year.csv vers dbfs:/FileStore/tables/new/Largest_earthquakes_by_year.csv dans le dbfs monté.

  • Déplacer (renommer) un fichier dans DBFS
#Moving (renaming) a file in DBFS
dbutils.fs.mv("dbfs:/FileStore/tables/new/Largest_earthquakes_by_year.csv", "dbfs:/FileStore/tables/new/Largest_earthquakes_new_list.csv")

renomme le fichier situé dans dbfs:/FileStore/tables/new/Largest_earthquakes_by_year.csv en dbfs:/FileStore/tables/new/Largest_earthquakes_new_list.csv dans le dbfs monté.

  • Vérifier si un fichier existe dans DBFS
#Checking if a file exists in DBFS
val exists = dbutils.fs.ls("dbfs:/FileStore/tables/new/Largest_earthquakes_by_year.csv").nonEmpty
if (exists) {
  println("File exists.")
} else {
  println("File does not exist.")
}

Ce code vérifie si le fichier dbfs:/FileStore/tables/new/Largest_earthquakes_by_year.csvv existe dans le compartiment S3 monté. Si le fichier existe, il affiche « Le fichier existe. » ; sinon, il affiche « Le fichier n’existe pas ».

Sortir:

Contrôle de fichier

Optimisation des performances :

# Configure optimal file format for performance
spark.conf.set("spark.databricks.io.cache.enabled", "true")
spark.conf.set("spark.databricks.io.cache.maxDiskUsage", "50g")
spark.conf.set("spark.databricks.io.cache.timeout", "10m")
  1. spark.conf.set(« spark.databricks.io.cache.enabled », « true ») :
    • Cette configuration permet la mise en cache des opérations d’E/S Databricks.
    • Lorsqu’il est défini sur « true », le runtime Databricks utilisera des mécanismes de mise en cache pour améliorer les performances des opérations d’E/S, telles que la lecture de données à partir de systèmes de stockage externes comme Azure Blob Storage ou Amazon S3.
  2. spark.conf.set(« spark.databricks.io.cache.maxDiskUsage », « 50g ») :
    • Cette configuration définit l’utilisation maximale du disque autorisée pour la mise en cache dans le runtime Databricks.
    • Dans ce cas, réglez le paramètre sur « 50g », indiquant 50 gigaoctets.
    • Ce paramètre contrôle la quantité d’espace disque disponible pour la mise en cache des données.
    • Si les données mises en cache dépassent cette limite, le système peut supprimer les anciennes données mises en cache pour faire place aux nouvelles données.
  3. spark.conf.set(« spark.databricks.io.cache.timeout », « 10m ») :
    • Cette configuration définit le délai d’expiration des données mises en cache dans le runtime Databricks.
    • Dans ce cas, réglez-le sur « 10 m », indiquant 10 minutes.
    • Ce paramètre détermine la durée pendant laquelle les données mises en cache restent valides avant que le système ne les considère comme obsolètes et éventuellement les expulse du cache.
    • La définition d’un délai d’expiration permet de garantir que les données mises en cache restent à jour et reflètent toutes les mises à jour apportées à la source de données sous-jacente.

En configurant ces paramètres, vous pouvez affiner le comportement de mise en cache dans le runtime Databricks pour optimiser les performances et l’utilisation des ressources en fonction de votre charge de travail et de vos exigences spécifiques.

Conclusion:

DBFS simplifie la gestion des données et l’accès aux applications Spark en fournissant une interface de système de fichiers unifiée sur divers services de stockage. Il élimine les complexités liées à la gestion des différents backends de stockage, permettant aux ingénieurs de données et aux data scientists de se concentrer plus facilement sur leurs tâches d’analyse et d’apprentissage automatique. En utilisant DBFS efficacement, les organisations peuvent exploiter la puissance d’Apache Spark pour un traitement Big Data évolutif et efficace.

Dans cet article de blog, nous avons couvert les bases de DBFS ainsi que des exemples pratiques pour illustrer son utilisation dans les environnements Apache Spark. Que vous travailliez avec Amazon S3, Azure Blob Storage ou Hadoop HDFS, DBFS offre un moyen transparent et efficace d’interagir avec vos données, améliorant ainsi la productivité et les performances des flux de travail d’analyse Big Data.

Les références:

Documentation officielle Spark : Spark SQL et DataFrames – Documentation Spark 2.3.0 (apache.org)

Documentation officielle DBFS : Qu’est-ce que le système de fichiers Databricks (DBFS) ? | Databricks sur AWS






Source link