Fermer

juin 18, 2021

Plongez en profondeur dans Databricks Tempo pour l'analyse de séries chronologiques


Les données de séries chronologiques ont généralement été insérées de manière imparfaite dans la base de données que nous utilisions à l'époque pour d'autres tâches. Des bases de données de séries chronologiques (TSDB) arrivent sur le marché. Les TSDB sont optimisées pour stocker et récupérer les paires d'heures et de valeurs associées. L'architecture de TSDB se concentre sur le stockage de données d'horodatage et les compressions, la synthèse et la gestion du cycle de vie sont personnalisées pour cette structure. Habituellement, nous n'allons pas passer à un nouveau TSDB brillant; nous utiliserons simplement des fichiers Parquet. Databricks a publié un projet open source appelé Tempo qui simplifie la manipulation des séries temporelles dans Spark sur Parquet (en particulier Delta).

Que fait Tempo ?

Le point d'entrée est un objet TSDF (Time Series Data Frame). Les colonnes d'horodatage sont obligatoires. Les colonnes de partition et de séquence sont facultatives mais jouent un rôle dans un certain nombre de cas d'utilisation, y compris la fonctionnalité automatique. Avec une TSDB, vous disposez de certaines fonctions natives.

  • Asof joins – utilisez le fenêtrage pour sélectionner le dernier enregistrement d'une table source et fusionnez avec la table Fact de base.
  • Moyennes mobiles – incluent la moyenne mobile exponentielle approximative et simple Moyenne mobile
  • Rééchantillonnage – suréchantillonnage basé sur la fréquence et une fonction d'agrégation

En approfondissant le code, j'ai découvert que des fonctionnalités supplémentaires avaient été écrites qui utilisaient essentiellement la base du code pour répondre à un cas d'utilisation particulier.

Comment fonctionne Tempo. work?

Un coup d'œil à tsdf.py montre que vous devez fournir une colonne de données et d'horodatage et éventuellement une ou plusieurs colonnes de partition et une colonne de séquence. La colonne d'horodatage sera utilisée pour le tri. Les colonnes de partition facultatives et/ou les colonnes de séquence peuvent être utilisées pour la caractérisation. La colonne d'horodatage doit être une chaîne tandis que les colonnes de partition doivent être une chaîne ou une liste de chaînes.

Lorsque vous souhaitez écrire le TSDF dans une table Delta, vous appelez la fonction d'écriture, en passant le TSDF, le contexte d'étincelle, Nom de la table delta et colonnes d'optimisation (le cas échéant). Sous les couvertures, cela écrira dans une table delta.

view_df.write.mode("overwrite").partitionBy("event_dt").format('delta').saveAsTable(tabName)
Data Intelligence - L'avenir du Big Data
L'avenir du Big Data

Avec quelques conseils, vous pouvez créer une plate-forme de données adaptée aux besoins de votre organisation et tirer le meilleur parti de votre capital de données.

Obtenir le guide[19659016]Si vous utilisez des tables Delta open source sans le runtime Databricks, vous ne pourrez pas optimiser les performances à l'aide de Z-ordering.

useDeltaOpt = (os.getenv('DATABRICKS_RUNTIME_VERSION') ! = aucun)

si utiliserDeltaOpt :
     essayer:
        spark.sql("optimiser {} zorder par {}".format(tabName, "(" + ",".join(partitionCols + optimisationCols) + ")"))
     sauf:
        print("Tentatives d'optimisations delta sur une plate-forme non Databricks. Passez à l'utilisation de Databricks Runtime pour obtenir des avantages d'optimisation.")

(Il n'est pas difficile de voir comment vous pouvez simplement écrire dans un fichier parquet standard en modifiant éventuellement io.py ou même en ajoutant une autre définition d'écriture à tsdf.py.)

Asof

Il existe un Module asofJoin dans tsdf.py qui effectue une jointure asof entre deux TSDF dont les colonnes d'horodatage correspondent. Passer la trame de données actuelle ainsi que le tsdf droit est obligatoire, tandis que vous pouvez spécifier des préfixes pour les TSDF gauche et droit (le droit sera par défaut le préfixe « right ») pour éviter les noms de colonnes dupliqués. Vous avez la possibilité de fournir une valeur pour diviser les partitions en tranches de temps, ce qui peut limiter l'asymétrie et éviter les valeurs nulles s'il existe des valeurs en dehors de la période d'analyse maximale. La fraction de chevauchement est par défaut de 0,5. Un TSDF est créé à l'aide de la trame de données créée par la jointure asof (avec les données superposées et les colonnes superflues supprimées), la colonne d'horodatage commune et les colonnes partitionnées combinées et renvoyées.

Moving Averages

La moyenne mobile simple est utilisée pour calculer statistiques glissantes basées sur la colonne d'horodatage. Appelez withRangeStats pour calculer la moyenne/nombre/min/max/somme/écart standard/zscore sur toutes les colonnes numériques ou un ensemble de colonnes plus spécifique si fourni. La fenêtre arrière de plage par défaut est de 1000 secondes à partir du plancher de l'horodatage de l'événement de base. Les hypothèses suivantes sont formulées :

1. Les caractéristiques sont résumées sur une fenêtre glissante qui remonte à
2. La fenêtre arrière de gamme peut être spécifiée par l'utilisateur
3. Les numéros de séquence ne sont pas encore pris en charge pour le tri
4. Il y a un transtypage trop long à partir de l'horodatage, donc des microsecondes ou plus probablement des pannes – cela pourrait être plus facilement géré avec un horodatage de chaîne ou en triant l'horodatage lui-même. Si vous utilisez une fenêtre « lignes précédentes », ce ne serait pas un problème

La ​​convention de dénomination n'est pas très cohérente ici, vous calculez donc une moyenne mobile exponentielle en appelant EMA et en passant le TSDF et le nom de la colonne à calculer. Le calcul s'exécutera jusqu'à la fenêtre (30 par défaut).

Rééchantillonnage

Méthode pratique pour la conversion de fréquence et le rééchantillonnage des données de séries temporelles. Cela fournit des fonctionnalités similaires à pandas.DataFrame.Resample mais ce n'est pas une baisse de remplacement (contrairement à l'objectif de Koalas). Fournissez une colonne d'horodatage pour le tri et des colonnes à utiliser pour le partitionnement en séries temporelles plus granulaires pour le fenêtrage et le tri (h, min, sec) et transmettez une fonction d'agrégation. L'agrégation se fait dans resample.py.

Fonctionnalité supplémentaire

Si vous regardez la source de tsdf.py, vous trouverez une fonction appelée withLookbackFeatures.

Crée un tenseur de caractéristiques 2D adapté à l'entraînement un modèle ML pour prédire les valeurs actuelles à partir de l'historique d'un ensemble de caractéristiques. Cette fonction crée une nouvelle colonne contenant, pour chaque observation, un tableau 2-D des valeurs d'un certain nombre d'autres colonnes sur une fenêtre « regard en arrière » de l'observation précédente jusqu'à un certain nombre maximum d'observations passées.

Ceci est une petite fonction simple qui utilise les fonctionnalités privées BaseWindow et RowsBetweenWindows déjà présentes dans la base de code pour créer quelque chose de nouveau et d'utile. C'est un bon exemple de ce qui est possible en travaillant dans le code source en plus de ce que vous pouvez faire avec la bibliothèque elle-même.

Conclusion

Contrairement à Delta ou Koalas qui essaient pour apporter des changements fondamentaux pour le mieux, cette bibliothèque du Databricks Lab facilite simplement la gestion d'un cas d'utilisation particulier, mais courant. Tous les projets du Databricks Lab sont destinés à être utilisés sur la plate-forme d'analyse unifiée Databricks, mais le code source est utile en soi et peut parfois aider à accélérer le travail même en dehors du DUAP.

À propos de l'Auteur <!–:   dcallaghan, Solutions Architect–>

En tant qu'architecte de solutions chez Perficient, j'apporte vingt ans d'expérience en développement et je suis actuellement en contact avec Hadoop/Spark, blockchain et cloud, codage en Java, Scala et Go. Je suis certifié et travaille beaucoup avec Hadoop, Cassandra, Spark, AWS, MongoDB et Pentaho. Plus récemment, j'ai apporté des solutions de blockchain intégrées (en particulier Hyperledger et Ethereum) et de big data au cloud en mettant l'accent sur l'intégration de produits de données modernes tels que HBase, Cassandra et Neo4J en tant que référentiel hors blockchain.

cet auteur






Source link