Lire les données Azure Eventhub sur DataFrame

Lecture des données Azure EventHub dans DataFrame à l’aide de Python dans Databricks
Azure EventHubs offre un service puissant pour traiter de grandes quantités de données. Dans ce guide, nous découvrirons comment lire efficacement les données d’Azure EventHub et les convertir en DataFrame à l’aide de Python dans Databricks. Cette procédure pas à pas simplifie l’interaction entre Azure EventHubs et la facilité d’utilisation des DataFrames.
Conditions préalables:
Avant de plonger dans le code, assurez-vous de disposer de la configuration et des autorisations nécessaires :
- Connaissance de base de la configuration d’EventHubs, de Key Vaults et de la gestion des secrets.
- Instance Azure EventHub créée (dans cet exemple, nous utiliserons « myehub »).
- Accédez à Azure Key Vault pour stocker et accéder en toute sécurité aux informations d’identification requises.
- Connaissance de base de Scala, Apache Spark et Databricks Notebooks.
1. Configuration de la configuration :
td_scope = "kv-test-01-dev" namespace_name = "contosoehubns" shared_access_key_name = "test" eventhub = "myehub" shared_access_key = dbutils.secrets.get(scope=td_scope, key="KEY") # Construct the connection string connection = f"Endpoint=sb://{namespace_name}.servicebus.windows.net/;SharedAccessKeyName={shared_access_key_name};SharedAccessKey={shared_access_key};EntityPath={eventhub}" # Define the consumer group consumer_group = "$Default"
Tout d’abord, ce script initialise la configuration pour accéder à Azure EventHub dans un environnement Databricks. Il définit des paramètres tels que la portée, l’espace de noms, les détails de la clé d’accès et l’EventHub lui-même. De plus, il construit la chaîne de connexion nécessaire à l’interface avec le service EventHub, permettant une consommation transparente des données.
2. Lire les données EventHub
Utilisez le SDK Azure pour Python (azure-eventhub) pour lire les données d’EventHub. Ensuite, définissez une fonction (read_event
) pour traiter les événements entrants et imprimer les données et les métadonnées associées.
pip install azure-eventhub
from azure.eventhub import EventHubConsumerClient def read_event(partition_context, event): event_data = event.body_as_str() enqueued_time = event.enqueued_time partition_id = partition_context.partition_id # Process data or perform operations here print(event_data) print(enqueued_time) partition_context.update_checkpoint(event) # Create an EventHub consumer client client = EventHubConsumerClient.from_connection_string(connection, consumer_group, eventhub_name="data01") with client: # Start receiving events client.receive(on_event=read_event, starting_position="-1")
La fonction, read_event
, est appelé pour chaque événement reçu de EventHub. Il extrait des informations de l’événement, telles que event_data
(le contenu de l’événement), enqueued_time
(l’heure à laquelle l’événement a été mis en file d’attente), et partition_id
(ID de la partition à partir de laquelle l’événement a été reçu). Dans cet exemple, il imprime simplement les données d’événement et l’heure de mise en file d’attente, mais c’est ici que vous traiterez ou analyserez généralement les données selon les besoins de votre application.
Ici, un exemple de EventHubConsumerClient
est créé à l’aide du from_connection_string
méthode. Cela nécessite des paramètres comme connection
(qui contient la chaîne de connexion à Azure EventHub), consumer_group
(le nom du groupe de consommateurs), et eventhub_name
(le nom de l’EventHub).
Finalement, le client.receive
La méthode lance le processus de consommation d’événements. Le on_event
Le paramètre spécifie la fonction (read_event
dans ce cas) qui sera appelé pour chaque événement reçu. Le starting_position
Le paramètre spécifie à partir de quel point du flux d’événements le client doit commencer à consommer des événements (« -1 » indique à partir des événements les plus récents).
En outre, la sortie de cette cellule donnerait des événements continus jusqu’à leur arrêt.
3. Transformer en DataFrame
Pour convertir les données reçues en DataFrame, utilisez les capacités de Pandas dans Databricks. Initialisez un DataFrame avec les données d’événement reçues dans le read_event
fonctionner et effectuer les transformations selon les besoins.
import pandas as pd # Inside read_event function data = pd.DataFrame({ 'Event_Data': [event_data], 'Enqueued_Time': [enqueued_time], 'Partition_ID': [partition_id] }) # Further data manipulation or operations can be performed here # For example: # aggregated_data = data.groupby('Some_Column').mean() # Display the DataFrame in Databricks display(data)
C’est pourquoi nous avons décrit ici le processus de lecture des données Azure EventHub à l’aide de Python dans Databricks. Le SDK Python Azure EventHub fournit les outils nécessaires pour consommer et traiter les données entrantes, et en tirant parti de Pandas DataFrames, vous pouvez gérer et manipuler efficacement ces données dans l’environnement Databricks.
Expérimentez diverses transformations et techniques d’analyse sur le DataFrame pour obtenir des informations significatives à partir des données ingérées.
Consultez ce lien pour obtenir des conseils sur la lecture des « données Azure EventHub dans un DataFrame à l’aide de Scala dans Databricks », ainsi qu’un aperçu concis de la configuration des EventHubs, des KeyVaults et de la gestion des secrets.
Source link