Fermer

janvier 11, 2022

Consigner les exécutions de pipeline dans un fichier dans Azure Data Factory


Pourquoi Azure Data Factory ?

Aujourd'hui, les données générées par les applications de nos clients sont de plus en plus exponentielles, surtout si les données proviennent de plusieurs produits différents. Les organisations disposent de plusieurs types de données situées dans le cloud et sur site, dans des formats structurés, non structurés et semi-structurés, tous arrivant à des fréquences et des vitesses différentes. Il sera essentiel d'analyser et de stocker toutes ces données. Azure Data Factory (ADF) est un service d'intégration de données basé sur le cloud qui résout exactement ces scénarios complexes.

ADF stocke d'abord les données à l'aide d'un stockage de lac de données. Une fois stockées, les données sont analysées, puis à l'aide de pipelines, ADF transforme les données à organiser pour la publication. Une fois les données publiées, nous pouvons les visualiser à l'aide d'applications telles que Power BI, Tableau.

Pour un examen plus approfondi d'ADF et de ses fonctions de base, veuillez consulter le article de blog de mon collègue ici.

Enregistrer les exécutions de pipeline dans un fichier dans Azure Data Factory

Les solutions d'intégration de données sont complexes avec de nombreuses pièces mobiles et l'une des principales choses que nos clients souhaitent est de s'assurer qu'ils sont en mesure de surveiller leurs workflows ou pipelines d'intégration de données. Cependant, Data Factory Monitor ne stocke que les données du pipeline exécuté pendant 45 jours avec des informations très limitées. Mais avec les exécutions de pipeline de journal, nous pouvons stocker des données de journal personnalisées dans Azure Data Lake Storage (ADLS) plus longtemps à l'aide d'une requête.

Comment créer un fichier journal CSV dans Azure Data Lake Store.

Pour démonstration fins, j'ai déjà créé un pipeline d'activité de copie de tables qui copiera les données d'un dossier à un autre dans un conteneur d'ADLS.

Nous allons maintenant voir comment l'activité de copie des données générera des journaux personnalisés dans le fichier .csv. Nous allons commencer par ajouter l'activité de copie des données à côté des tables de copie dans le canevas.

Copier les tables

Pour l'ensemble de données source, car nous devons définir la requête dans la source de l'activité de copie des données, je sélectionnera l'ensemble de données en tant que serveur SQL sur site en sélectionnant le service lié du serveur SQL sur site.

Nouvel ensemble de données

Après avoir créé l'ensemble de données source, j'ajouterai une requête à la source. Cette requête contiendra une variable système de pipeline et d'autres métriques que je peux récupérer sur chaque tâche individuelle.

Copier les données

Vous trouverez ci-dessous la liste actuelle des variables système de pipeline.

@pipeline().DataFactory – Nom de la fabrique de données

@pipeline().Pipeline – Nom du pipeline

@pipeline().RunId – ID de l'exécution du pipeline

@pipeline().TriggerType – Type du déclencheur qui a appelé le pipeline (Manuel, Planifié)

@pipeline().TriggerName – Nom du déclencheur qui appelle le pipeline

@pipeline().TriggerTime – Heure à laquelle le déclencheur a appelé le pipeline.

Contenu dynamique

Query –

SELECT '@{pipeline().DataFactory}' comme DataFactory_Name,

'@{pipeline().Pipeline}' comme Pipeline_Name,

'@{activity('copytables').output. l'exécutif utionDetails[0].source.type}' comme Source_Type,

'@{activity('copytables').output.executionDetails[0].sink.type}' comme Sink_Type,

'@{activity(' copytables').output.executionDetails[0].status}' comme Execution_Status,

'@{activity('copytables').output.rowsRead}' comme RowsRead,

'@{activité('copytables') .output.rowsCopied}' as RowsWritten

'@{activity('copytables').output.copyDuration}' as CopyDurationInSecs,

'@{activity('copytables').output.executionDetails[0].start }' comme CopyActivity_Start_Time,

'@{utcnow()}' comme CopyActivity_End_Time,

'@{pipeline().RunId}' comme RunId,

'@{pipeline().TriggerType}' comme TriggerType ,

'@{pipeline().TriggerName}' comme TriggerName,

'@{pipeline().TriggerTime}' comme TriggerTime

La requête ci-dessus écrira les informations sur les événements dans un fichier .CSV. Pour cela, nous devons définir un ensemble de données de récepteur qui créera un répertoire dans le conteneur ADLS et le fichier journal CSV. L'instantané ci-dessous vous montre que j'ai sélectionné le type de jeu de données Azure Data Lake Store Gen 2 et le format de fichier .CSV.

Data Lake

J'ai utilisé ci-dessous le chemin paramétré qui garantira que le fichier journal est généré dans la structure de dossiers correcte avec le nom de fichier approprié.

Contenu dynamique dans le nom de fichier –

@concat(formatDateTime(convertTimeZone(utcnow(),'UTC',' Heure standard du Centre'),'dd'),'/',item().filename,'_',formatDateTime(convertTimeZone(utcnow(),'UTC','Central Standard Time'),'dd-MM-yy_hh -mm-ss'),'_log')

Avec cela, nous avons terminé la configuration du pipeline de journal, après avoir enregistré le pipeline, je dois le publier et exécuter mon pipeline. Maintenant, je peux voir le fichier journal généré dans le conteneur ADLS.

Méthode d'authentification

Après avoir téléchargé le fichier, nous pouvons voir que, selon notre requête, toute la sortie est renseignée dans le fichier .CSV.

Fichier de sortie .csv

Ainsi, nous avons ainsi configuré le pipeline de journal avec l'activité de copie d'aide dans ADF. Le principal avantage de la configuration du pipeline de journal est que nous pouvons obtenir la sortie de l'événement personnalisé sous forme de données dans un fichier .CSV qui aidera un client à vérifier l'état d'exécution, les lignes lues et les lignes écrites, etc.

Pourquoi Perficient ?

Notre plus de 20 ans d'expérience dans les données dans tous les secteurs nous donnent une compréhension approfondie des tendances actuelles des données. En tant que partenaire primé, Gold-Certified Microsoft et l'un des rares fournisseurs de solutions nationaux, nous sommes un expert reconnu du cloud avec des années d'expérience à aider les entreprises à tirer le meilleur parti du cloud Microsoft.[19659003]Prêt à rassembler vos données pour tirer parti des analyses avancées avec Azure ? Contactez notre équipe à propos de cette solution.

À propos de l'auteur <!– :   rdhande, Consultant technique senior–>

Rohit Dhande est consultant technique senior chez Perficient, avec plus de 6 ans d'expérience dans les solutions Cloud et sur site. Il a une expérience pratique de la transformation des données à l'aide de l'outil ETL, de l'entreposage de données, des configurations d'applications, de la gestion du code source, de la gestion des correctifs, de la création, de l'automatisation, de la gestion et de la publication de code dans différents environnements et du déploiement sur des serveurs. Il possède une certification mondiale Red Hat et Microsoft Azure. Rohit est un résolveur de problèmes créatif et amusant qui aime travailler avec l'équipe pour créer des résultats exceptionnels.

Plus de cet auteur




Source link