Fermer

avril 1, 2024

Ingestion de données boursières du NASDAQ dans Lakehouse via Notebook / Blogs / Perficient

Ingestion de données boursières du NASDAQ dans Lakehouse via Notebook / Blogs / Perficient


Arrière-plan

Microsoft Fabric apparaît comme une solution unique pour les aspects liés aux données. Avant l’introduction de Fabric, Power BI était confronté à peu de limitations liées à l’ingestion de données, car Power Query offre des fonctionnalités ETL et de transformation de données limitées. Les scripts du langage Power Query M manquent de facilité de développement, par rapport aux langages populaires tels que Java / C# / Python, etc., ce qui peut nécessiter des scénarios complexes. Lakehouse de Microsoft Fabric élimine cet inconvénient en fournissant la puissance d’Apache Spark, qui peut être utilisée dans les ordinateurs portables pour gérer des exigences complexes. Traditionnellement, les organisations fournissaient plusieurs services de services Azure, comme Azure Storage, Azure Databricks, etc. Fabric rassemble tous les services requis sur une seule plateforme.

Étude de cas

Une organisation de capital-investissement souhaite surveiller de près les actions dans lesquelles elle a investi pour ses clients. Ils souhaitent générer des tendances, des prédictions (à l’aide du ML) et analyser des données sur la base d’algorithmes développés par leur équipe de gestion de portefeuille en collaboration avec des data scientists écrits en Python. L’équipe de reporting souhaite consommer des données pour préparer les tableaux de bord, à l’aide de Power BI. L’organisation dispose d’un abonnement à l’API Market Data, qui peut extraire des données de marché en direct. Ces données doivent être ingérées en temps réel dans l’entrepôt, pour une utilisation ultérieure par l’équipe de data scientists et d’analystes de données.

Terminologies utilisées

Vous trouverez ci-dessous quelques termes utilisés dans le blog. Une meilleure compréhension de ceux-ci en visitant le site Web respectif est conseillée pour une meilleure compréhension :

  • Maison au bord du lac: En termes simples, il s’agit de l’entrepôt qui stockera les données non structurées telles que les fichiers CSV dans des dossiers et les données structurées, c’est-à-dire un tableau (au format Delta Lake). Pour en savoir plus sur Lakehouse, visitez le lien de la documentation officielle : https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-overview
  • Carnet de notes: C’est un endroit pour stocker notre code Python ainsi que la documentation à l’appui (au format Markdown). Visitez ce lien pour plus de détails sur Fabric Notebook : https://learn.microsoft.com/en-us/fabric/data-engineering/how-to-use-notebook
  • PySpark : Apache Spark est un moteur en mémoire pour l’analyse du Bigdata. Spark prend en charge des langages tels que Java / Scala / SQL / Python / R. PySpark est un SDK basé sur Python pour Spark. Plus d’informations sur Spark peuvent être trouvées sur le site officiel : https://spark.apache.org/
  • Modèle sémantique : L’ensemble de données Power BI est désormais renommé Modèle sémantique.
  • Facteur: Postman est un outil populaire principalement utilisé pour les tests d’API (édition gratuite à fonctionnalités limitées disponible). Postman propose une interface graphique pour effectuer des requêtes HTTP et inspecter leur réponse dans différents formats comme JSON/HTML, etc.
  • Polygone.io : Il s’agit d’une plate-forme de données de marché proposant une API pour interroger les cours des actions et les informations associées.

Représentation schématique

Vous trouverez ci-dessous l’organigramme pour vous aider à comprendre comment les composants Fabric sont liés les uns aux autres pour obtenir le résultat.

Représentation schématique

Capture de données de flux API

Dans cette étude de cas, un compte gratuit inscription au site Web https://polygon.io a été fait, ce qui permet d’interroger les données de fin de journée avec un plafond de 5 requêtes API maximum / minute. Compte tenu de cette limitation, les données horaires de seulement 3 titres ont été ingérées, pour démontrer le POC (Proof-of-Concept). Les téléspectateurs sont encouragés à utiliser un compte payant, qui prend en charge les données en temps réel avec des requêtes API illimitées, pour leur utilisation de développement/test/production.

Vous trouverez ci-dessous la capture d’écran de la requête HTTP avec réponse effectuée via Postman pour une sécurité unique, à implémenter dans Notebook, pour l’ingestion de données.

Demande d'API Postman

La réponse JSON contient la propriété nommée résultatsde type tableau d’objets contenant l’état horaire d’une sécurité spécifique.
o = ouvert / c = fermer / h = élevé / je = faible / v = volume échangé / t = horodatage (dans le style Unix)

Étape 01 : Créer un espace de travail de capacité Fabric

Pour le POC, nous créerons un espace de travail nommé Security Market, pour notre division de gestion de portefeuille, à l’aide du bouton Nouvel espace de travail (disponible pour l’administrateur Fabric), avec les paramètres conformes aux captures d’écran ci-dessous.

Paramètres de l'espace de travail Fabric

Il est crucial que dans l’onglet Premium des paramètres, il soit nécessaire de choisir la capacité Fabric (ou Trial), proposée par Lakehouse (voir capture d’écran ci-dessous).

Capacité de l’espace de travail Fabric

Une fois créé, il devrait ressembler à ci-dessous (voir capture d’écran ci-dessous).

Aperçu de l'espace de travail Fabric

Étape 02 : Configurer Lakehouse

Ensuite, nous créerons un nouveau Lakehouse pour héberger les données capturées du flux API. Cliquez sur le bouton Nouveau et choisissez plus d’options (si Lakehouse n’est pas visible dans le menu). Une page détaillée comme indiqué dans la capture d’écran ci-dessous apparaîtra.

Créer un menu Lakehouse

Utilisez l’option Lakehouse pour créer un nouveau Lakehouse. Renommez ce Lakehouse selon votre choix.

Lakehouse peut héberger des données structurées Tableau & Données semi-structurées / non structurées Sous-dossier pour stocker des fichiers bruts/traités. Nous allons créer un sous-dossier nommé EOD_Données pour stocker les données reçues de la demande d’API au format de fichier CSV, qui à leur tour seraient disponibles pour Data Scientist pour un traitement ultérieur (voir capture d’écran ci-dessous).

Option de création de dossier Lakehouse

Étape 03 : Créer un bloc-notes

Une fois que Lakehouse est prêt, nous pouvons passer à l’étape suivante, où nous écrirons du code Python pour capturer et ingérer des données. Cliquez sur Ouvrir Carnet de notes > Nouveau cahier pour initialiser un bloc-notes vierge (voir capture d’écran ci-dessous).

Option Créer un bloc-notes

Cela ouvrirait un bloc-notes vierge. Copiez-collez le code Python ci-dessous dans la cellule de code, comme indiqué dans la capture d’écran ci-dessous.

import datetime as dt
import requests as httpclient
from notebookutils import mssparkutils

api_key = 'hfoZ81xxxxxxxxxxxxxxxx'  # Secret API Key
symbol_list = ['MSFT', 'GOOG', 'PRFT']  # Symbol list

target_date = dt.datetime.today()
file_content="symbol,timestamp,open,high,low,close,volume\n"  # insert CSV header
dt_YYYYMMDD = target_date.strftime('%Y-%m-%d')  # YYYYMMDD

for symbol in symbol_list:  # Iterate through each symbol (security)
    api_url = f'https://api.polygon.io/v2/aggs/ticker/{symbol}/range/1/hour/{dt_YYYYMMDD}/{dt_YYYYMMDD}/?apiKey={api_key}'
    resp_obj = httpclient.get(api_url).json()
    for r in resp_obj['results']:  # Iterate through each rows of security for respective frequency of timestamp
        price_open, price_close, price_high, price_low, trade_volume = r['o'], r['c'], r['h'], r['l'], r['v']
        timestamp = dt.datetime.fromtimestamp(r['t']/1000).strftime('%Y-%m-%dT%H:%M:%S') # decode unix timestamp
        file_content += f'{symbol},{timestamp},{price_open},{price_high},{price_low},{price_close},{trade_volume}\n' # append row
    
mssparkutils.fs.put(f'Files/EOD_Data/{dt_YYYYMMDD}.csv', file_content)  # Save file into Datalake with Date identifier
df = spark.read.load(f'Files/EOD_Data/{dt_YYYYMMDD}.csv', format="csv", header=True, inferSchema=True) # Read file into dataframe
df.write.saveAsTable('nasdaq', mode="append")  # Append dataframe rows to "nasdaq" table

Exécutez le code ci-dessus après la fermeture du marché NASDAQ. Comprenons en un mot ce que fait ce code Python :

  1. Chaque plateforme Market Data propose une clé API secrète, qui doit être fournie dans l’URL ou l’en-tête HTTP (tel que défini dans la documentation de l’API).
  2. Juste pour expérimenter, nous avons sélectionné 3 titres MSFT (Microsoft Corp), GOOG (Alphabet Inc – Class C) et PRFT (Perficient Inc).
  3. L’URL nécessite que la date soit au format AAAA-MM-JJ, que contient la variable dt_YYYYMMDD.
  4. Ensuite, nous exécutons une boucle pour chaque sécurité que nous souhaitons interroger.
  5. La demande HTTP Get est adressée à la plate-forme Market API en préparant dynamiquement l’URL avec la date cible, la sécurité (symbole) et la clé API, en définissant la fréquence des données horaires à renvoyer.
  6. Dans la réponse JSON, la propriété result contient un tableau de modifications horaires des données des attributs de sécurité (comme ouverture/fermeture/haut/bas/etc.), comme illustré dans la capture d’écran de la demande du facteur. Veuillez vous référer à la documentation API de la plateforme de marché respective pour en savoir plus.
  7. Ensuite, nous exécutons une boucle pour itérer et capturer les données horaires et les ajoutons à une variable texte nommée file_content au format séparé par des virgules, pour préparer notre fichier CSV (notez que nous avons déjà écrit l’en-tête CSV dans la ligne n°9 du code).
  8. Après l’exécution des deux boucles, à la ligne n° 20, un fichier avec une structure de dénomination (AAAAMMJJ.csv) est créé sous le sous-dossier EOD_Data.
  9. Dans le dernier cas, ce fichier CSV enregistré est lu à l’aide du lecteur Spark dans le bloc de données et le résultat est ajouté à un tableau nommé «nasdaq» (Spark créera automatiquement une table si elle n’est pas trouvée).

Prévisualisons les données pour garantir le succès du script Python. Aller vers Maison au bord du lacdéveloppez Tables et assurez-vous d’avoir une table nommée « nasdaq » est créé. Reportez-vous à la capture d’écran ci-dessous pour des exemples de données.

Aperçu de la table Lakehouse

Étape 04 : Planifier le travail

Ce code de notebook doit être exécuté quotidiennement. Notebook offre une fonctionnalité de planification du code pour qu’il s’exécute automatiquement lors d’un événement à fréquence définie. Cette option est disponible dans Notebook sous l’option Exécuter > Planifier.

Menu de planification du bloc-notes

Cela ouvrirait la page d’options de planification détaillée comme ci-dessous. En supposant 16h00 HNE comme heure de fermeture et en ajoutant un tampon de 30 minutes pour des raisons de sécurité, appliquons une minuterie pour exécuter ce carnet quotidiennement à 16h30 (voir l’image ci-dessous).

Minuterie de planification du bloc-notes

Le travail se déroulerait quotidiennement même le week-end lorsque le marché est fermé. Idéalement, cela ne devrait pas affecter les analyses, car la position de fin de journée du vendredi se poursuivrait. Les data scientists sont libres de supprimer les données du week-end ou d’ignorer ces données de leurs autres scripts de calcul.

Étape 05 : Générer un modèle sémantique

Le modèle sémantique (anciennement connu sous le nom de Dataset) sert de source de données pour les rapports Power BI. Lakehouse contient une option pour générer un modèle sémantique offrant la possibilité de choisir des tables spécifiques à charger dans le modèle requis par le développeur BI (voir capture d’écran ci-dessous).

Modèle symétrique de charge Lakehouse

BI Developer peut s’appuyer davantage sur ce modèle sémantique pour créer des relations et des mesures. La seule limitation est que les colonnes calculées ne peuvent pas être ajoutées aux tables à partir de l’éditeur de modèle, car dans le backend, il n’y a pas de Power Query. Les colonnes doivent être ajoutées à l’aide de Notebook.

Conclusion

L’histoire ne s’arrête pas là mais continue avec la création de tableaux de bord et de rapports à partir de Power BI basés sur le modèle sémantique produit par Lakehouse. Fabric permet l’intégration d’une équipe de data scientists, d’ingénieurs de données et d’analystes de données sur une seule plateforme unifiée. L’administrateur Azure doit simplement provisionner la capacité Fabric, qui est évolutive tout comme la charge de travail Azure standard, basée sur les CU (unités de consommation), qui peuvent être modifiées toutes les heures, pour s’adapter aux heures de pointe de la charge de travail. Le blog a l’intention de partager quelques capacités de Fabric pour traiter des scénarios réels. Il existe de nombreux composants de Fabric tels que Data Activator, ML Model, Data Pipeline, qui, pour des cas d’utilisation de niveau plus complexes, peuvent être excellents pour l’exploration.






Source link