Fermer

août 5, 2024

Réduire la dette technique avec les tables système Databricks / Blogs / Perficient

Réduire la dette technique avec les tables système Databricks / Blogs / Perficient


Les tables système Databricks sont actuellement en préversion publique, ce qui signifie qu’elles sont accessibles mais certains détails peuvent encore changer. Voici comment Databricks décrit les tables système :

Les tables système sont un magasin analytique hébergé par Databricks des données opérationnelles de votre compte trouvées dans le system catalogue. Les tables système peuvent être utilisées pour l’observabilité historique sur votre compte.

Je vais décrire juste un exemple où les tables système ont pu me fournir une solution simple et prête à l’emploi pour remplacer une solution moins simple et moins prête à l’emploi. La possibilité de remplacer le code personnalisé de cette manière est importante lorsque vous essayez de créer un système stable, résilient et gérable. Ceci n’est qu’un petit exemple, mais imaginez à quel point la base de code d’une organisation pourrait être plus simple si vous recréez ce processus plusieurs dizaines de fois.

Le défi

Il existe un certain nombre de différentes tables système mais je veux me concentrer uniquement sur le tableaux de lignage: table_lineage et column_lineage. Plus précisément, je souhaite parler de la façon dont je les ai exploités pour simplifier considérablement un processus que j’utilisais pour surveiller un grand nombre de Part Delta les tables. Nous avions un client qui consommait environ 9 000 tables partagées auprès d’un fournisseur. Le fournisseur déposerait les tables dans l’emplacement de partage, puis une vue (avec les noms de colonnes pour les performances) serait créée dans un catalogue dans un espace de travail professionnel. Le fournisseur a déclaré qu’il fournirait une liste des changements chaque mois ; tables et/ou colonnes nouvelles ou supprimées. Cependant, le client a déclaré qu’il disposait d’un SLA d’une seule journée pour les modifications. L’entreprise voulait s’assurer qu’elle savait à l’avance qu’une tâche ou un rapport par rapport à une vue échouerait en raison de la suppression ou de la modification de la table sous-jacente. De plus, s’ils demandaient une nouvelle table, ils ne voulaient pas attendre un mois pour savoir si elle avait été livrée.

La solution (sans tables système)

Nous avons Catalogue Unity activé, car il s’agit d’une condition préalable à Delta Share. J’avais besoin de pouvoir surveiller les modifications de tous les objets sécurisables de la collection du fournisseur dans Delta Share, à savoir les schémas, les tables et les colonnes. Il n’y avait que quatre ou cinq schémas, de sorte que la surveillance pouvait presque être effectuée manuellement. Cependant, il y avait beaucoup de tableaux et ces tableaux avaient même beaucoup de colonnes. Je n’avais accès qu’à INFORMATION_SCHEMA pour chacun des schémas, afin de pouvoir interroger les noms de tables et de colonnes. L’idée était de conserver un hachage des noms de tables (ou de colonnes) ordonnés pour une comparaison rapide. (Je montre uniquement le code des tableaux ; le code de colonne est similaire.)

def get_tables_hash_and_names(spark):
   tables_query = """
      SELECT table_name 
      FROM xyz.information_schema.tables 
      WHERE data_source_format="DELTASHARING" 
      ORDER BY table_name DESC
    """
    tables_df = spark.sql(tables_query)
    table_names = sorted([row['table_name'] for row in tables_df.collect()])
    tables_str="|".join(table_names)
    tables_hash = hashlib.md5(tables_str.encode('utf-8')).hexdigest()
    return tables_hash, table_names
Intelligence des données - L'avenir du Big Data
L’avenir du Big Data

Avec quelques conseils, vous pouvez créer une plateforme de données adaptée aux besoins de votre organisation et tirer le meilleur parti de votre capital de données.

Obtenez le guide

S’il y a une divergence, effectuez une analyse plus approfondie pour identifier les changements. Cela signifie que l’état doit être sauvegardé pour comparer le jour en cours au jour précédent,

def save_schema_hashes(spark, current_schema_hashes): 
   current_timestamp = datetime.now() 
   records = [(table_name, schema_hash, current_timestamp) for table_name, schema_hash in current_schema_hashes.items()] 
   schema_hashes_df = spark.createDataFrame(records, ["table_name", "schema_hash", "timestamp"])  
   schema_hashes_df.write.format("delta").mode("append").saveAsTable("delta_schema_hashes")

Enfin, effectuer une comparaison rapide à l’aide de la valeur de hachage stockée dans les clés identifie les tables ou colonnes qui auraient pu changer, nécessitant la création d’une nouvelle vue.

def identify_schema_changes(current_schema_hashes, previous_schema_hashes):
   added_tables = set(current_schema_hashes.keys()) - set(previous_schema_hashes.keys())
   deleted_tables = set(previous_schema_hashes.keys()) - set(current_schema_hashes.keys())
   modified_tables = set()

   for table in (set(current_schema_hashes.keys()) & set(previous_schema_hashes.keys())):
      if current_schema_hashes[table “” not found /]
!= previous_schema_hashes[table “” not found /]
: modified_tables.add(table) return added_tables, deleted_tables, modified_tables

La solution (avec les tables système)

Fondamentalement, j’avais besoin de créer un magasin analytique des données opérationnelles du compte trouvées dans le system catalogue. Semble familier? Une fois que Databricks a fourni le lignage avec ses tables système, aucune de ces fonctionnalités n’était nécessaire. Désormais, tout est passé du code au SQL, réduisant considérablement la complexité de la solution. Voici une requête pour voir si une source existe dans la cible :

SELECT DISTINCT
target_table_name
FROM
system.access.table_lineage tl_outer
WHERE
target_table_catalog = '{catalog_name}' AND
target_table_schema="{schema_name}" AND
target_table_type="VIEW" AND
NOT EXISTS (SELECT 1
FROM system.access.table_lineage as tl_inner
WHERE
tl_outer.target_table_catalog = tl_inner.source_table_catalog AND
tl_outer.target_table_schema = tl_inner.source_table_schema AND
tl_outer.target_table_name = tl_inner.source_table_name)

Et voici comment vérifier si une table cible n’existe plus dans la source :

SELECT DISTINCT
source_table_name
FROM
system.access.table_lineage
WHERE
source_table_catalog = '{catalog_name}' AND
source_table_schema="{schema_name}" AND
(target_table_catalog IS NULL AND
target_table_schema IS NULL AND
target_table_name IS NULL)

Conclusion

Les vues peuvent être créées ou supprimées automatiquement avec ces informations. Il en va de même pour les colonnes. Si quelque chose n’est pas là, il est supprimé ou modifié et une nouvelle vue est créée automatiquement avec les nouvelles colonnes. Les équipes clientes étaient plus à l’aise avec SQL que pyspark. De plus, je ne stockais pas et ne maintenais pas la lignée dans une table pour laquelle le client devait payer. Dans l’ensemble, les tables système m’ont permis de revisiter et de simplifier le code personnalisé.

Entrer en contact avec nous si vous souhaitez en savoir plus sur la façon dont les tables système Databricks pourraient vous aider à gérer le lignage, à améliorer la gouvernance, à fournir des informations sur les coûts ou potentiellement d’autres cas d’utilisation.






Source link