Fermer

mai 31, 2020

Création de pipelines modulaires dans Azure Data Factory à l'aide de données JSON


Les pipelines Azure Data Factory (ADF) sont puissants et peuvent être complexes. Dans cet article, je partage quelques leçons et pratiques pour les rendre plus modulaires afin d'améliorer la réutilisation et la gérabilité.

Les pipelines sont composés d'activités, à mesure que le nombre d'activités et de conditions augmente, la difficulté de maintenir le pipeline augmente également. Ceci est similaire à n'importe quelle procédure dans le code, plus il devient long de plus en plus de pouvoir éditer, lire, comprendre. Par conséquent, les meilleures pratiques logicielles promeuvent les procédures de refactorisation en éléments de fonctionnalité plus petits. Cela aide de nombreuses façons, comme tester, déboguer et apporter des modifications incrémentielles. La même approche peut être adoptée avec les pipelines. Cependant, il y a certaines choses sur les pipelines à prendre en compte pour ce faire:

  • Décomposer en unités de travail
  • Utiliser la dynamique avec des paramètres
  • Utiliser les expressions pour le comportement
  • Qu'en est-il des résultats de sortie [19659004] Fonctions utilisées et données JSON pour transmettre des messages

Unité de travail

Tout comme le code, un pipeline doit se concentrer sur une fonction spécifique et être autonome. La logique, les règles et les opérations seront une «boîte noire» pour les utilisateurs du pipeline, ne nécessitant que la connaissance de «ce que fait» plutôt que de «comment» il le fait. Les pipelines ADF incluent le nom (requis) et la description qui peuvent être utilisés pour fournir la documentation de base du pipeline pour communiquer avec ses consommateurs.

 Description du pipeline

Paramètres d'entrée

Pour être réutilisable, il doit il est évident qu'un pipeline devra permettre des paramètres d'entrée afin que chaque invocation puisse exécuter la fonction comme requis. Un pipeline peut avoir plusieurs paramètres. Les paramètres peuvent être de différents types: String, Int, Float, Bool, Array, Object et SecureString. Chaque paramètre peut également inclure une valeur par défaut, rendant une valeur de paramètre spécifique facultative lors de l'exécution du pipeline.

 Paramètres du pipeline

Expressions

Une partie des règles d'un pipeline peut inclure des expressions qui traitent les valeurs de paramètre fournies lorsque courir. Ces expressions peuvent préparer les données pour les activités au sein du pipeline. L'utilisation d'expressions introduit une dépendance sur le contenu et le format des valeurs des paramètres au-delà de la simple exigence ou de l'option. Par exemple, un paramètre String peut être une valeur séparée par des virgules (CSV) qui sera gérée par une expression. Dans ces cas, la description du pipeline doit détailler cela et / ou d'autres documents doivent accompagner le pipeline afin que les utilisateurs comprennent comment doit être utilisé.

 Expression de paramètre

Dans cet exemple, l'expression dynamique pour les éléments de ForEach L'activité PackageName utilise le paramètre PackageNames comme chaîne CSV pour itérer sur les valeurs fournies:

 @split (pipeline (). Parameters.PackageNames, ',') 

Il y aura plus d'exemples dans le monde réel Exemples section qui suit.

Résultats de sortie

Tout comme pour les paramètres d'entrée, pour être réutilisable, il devrait être évident qu'un pipeline devra renvoyer des détails sur les activités effectuées afin que les pipelines ultérieurs ou autres des activités peuvent avoir lieu. Tout comme le code où une méthode ou une procédure renvoie vide ce n'est pas une exigence et les résultats du pipeline peuvent être limités aux jeux de données manipulés ou à d'autres artefacts de l'exécution du pipeline. Cependant, dans un cas modulaire, il est probable que le pipeline devra transmettre des détails pour d'autres opérations. L'une des leçons apprises est le fait que les pipelines ADF ne fournissent PAS un moyen de définir les sorties d'une manière similaire aux paramètres d'entrée. Au lieu de cela, le pipeline ne renvoie qu'une seule valeur, le RunId . RunId se présente sous la forme d'un identifiant globalement unique (guid). Afin d'associer plus de détails sur les résultats du pipeline (par exemple, la ou les valeurs de sortie) à d'autres opérations, une certaine forme de stockage de ces résultats sera nécessaire afin que les opérations suivantes puissent faire référence selon les besoins.

J'ai trouvé Azure Le stockage sur table est une solution pratique pour cela. En utilisant une table avec une clé de partition pour le nom du pipeline et une clé de ligne pour RunId d'autres données peuvent être incluses selon les besoins. Un pipeline qui doit fournir un résultat plus détaillé insérera une ligne. Ensuite, les activités nécessitant ces détails peuvent lire la ligne en fonction de RunId du pipeline spécifique pendant l'exécution.

 Pipeline Runid

Dans cet exemple, une activité Azure Function est utilisée pour insérer l'enregistrement via l'expression Body:

 @concat ('{
    "partitionKey": "', pipeline (). Pipeline,'",
    "rowKey": "', pipeline (). RunId,'",
    "body": "', variables (' body '),'"
} ') 

Il y aura des exemples plus détaillés dans la section Exemples réels qui suit.

Utilisation des fonctions Azure et des données JSON

Pour développer les résultats de sortie stockés dans Azure Rangées de table de stockage pour les exécutions de pipeline, j'ai trouvé la nécessité pour différents pipelines d'avoir très probablement des valeurs de sortie différentes. Cela ne devrait pas être surprenant, tout comme vous vous attendriez à ce que les paramètres d'entrée diffèrent en fonction du pipeline et des fonctionnalités qu'il implémente. Pour résoudre ce problème, j'ai exploité les données JSON au format chaîne en tant que colonne Body . Les données JSON peuvent avoir un schéma dynamique pour chaque pipeline selon les besoins. Cependant, ce schéma JSON est désormais une dépendance pour les utilisateurs du pipeline. Comme indiqué dans le paramètre d'entrée CSV ci-dessus, lorsque les «appelants» d'un pipeline doivent comprendre les exigences de format d'entrée, ces mêmes «appelants» devront comprendre le schéma de sortie Body afin d'extraire les détails contenus. . Les expressions ADF jouent un rôle clé dans la création du corps «messages» insérés dans le tableau et l'analyse du même corps dans les valeurs d'autres activités.

Cela se fait généralement avec une variable définie comme indiqué dans le pipeline ExportPackage ci-dessus. Ce faisant, conserve l'expression pour appeler la fonction Azure plus simple et les détails du schéma dans l'expression d'activité Définir une variable.

 Expression Json

Cette expression d'exemple crée une chaîne JSON à partir d'un autre pipeline et / ou valeurs d'activité. La chaîne JSON est codée en base64 car elle sera utilisée comme valeur du membre JSON Body de la méthode Azure Function.

 @ base64 (concat ('{
    "packageName": "', pipeline (). parameters.PackageName,'",
    "executionId": "', guid (),'"
} ')) 

Remarque: l'utilisation de guid () comme valeur d'exécution ID de l'API est à titre d'exemple uniquement et simule le comportement de l'API. Plus d'informations à ce sujet dans la section suivante.

Dans le cadre d'un projet récent d'exportation de données Microsoft Dynamics 365 dans un entrepôt de données à l'aide du API de gestion de données REST package il était crucial de concevoir les pipelines afin qu'ils puissent être exécuté dynamiquement pour différents scénarios et les étapes résultantes du processus d'exportation être coordonnées.

 Microsoft - Le guide essentiel de l'engagement des utilisateurs finaux de Microsoft Teams

Ce processus comprend trois pipelines de composants:

  • ExportPackage: invoque le D365 API REST, spécifiant un nom de package souhaité et renvoyant et ExecutionId pour le package de demande
  • GetPackage: pour le ExecutionId renvoyé, une fois l'exportation réussie, téléchargez le fichier zip du package, extrayez le fichier zip vers Azure Blob Storage et renvoyer le chemin de stockage pour le contenu du package
  • ProcessPackage: pour chaque entité de données dans le contenu du package exporté, insérez les données dans les données entrepôt

Remarque: dans les exemples ci-dessous, les appels d'API et les activités d'entrepôt de données sont simulés avec une activité d'attente afin que le pipeline puisse démontrer le modèle modulaire sans aucune dépendance externe.

Pipeline ExportPackage

Ce pipeline est assez simple, utilise simplement le paramètre PackageName pour appeler l'API D365 REST pour déclencher le processus d'exportation. L'API renvoie un ExecutionId à utiliser pour surveiller le processus asynchrone avant que la sortie puisse être récupérée. À des fins de démonstration, l'API renvoie ici un nouveau guid comme ExecutionId .

 Export Pipeline

Input :

 PackageName, string [19659019]  Sortie : 

 @ base64 (concat ('{
    "packageName": "', pipeline (). parameters.PackageName,'",
    "executionId": "', guid (),'"
} ')) 

GetPackage Pipeline

Ce pipeline est en réalité plus complexe que ce qui est montré ici. Cela montre un seul appel d'API pour obtenir la sortie du package. En réalité, le package doit interroger l'état de l'exportation jusqu'à la fin. Une fois terminé, il lance un appel API pour récupérer l'URL de téléchargement. Avec l'URL, le fichier zip de sortie peut être copié et extrait vers Azure Blob Storage. Ces détails pour l'API D365 ne sont pas nécessaires pour montrer la modularité du pipeline. À des fins de démonstration, le nombre d'entités est un entier aléatoire compris entre 1 et 5 pour simuler différents contenus de package.

 Get Pipeline

Input :

 PackageName, string
ExecutionId, chaîne 

Sortie :

 @ base64 (concat ('{
    "packageName": "', pipeline (). parameters.PackageName,'",
    "executionId": "', guid (),'",
    "entityCount": ', chaîne (rand (1,6)),'
} ')) 

Pipeline ProcessPackage

Ce pipeline doit itérer sur chaque entité dans le contenu de sortie du package.

 Process Pipeline

Input : [1965901818] ] PackageName, chaîne
ExecutionId, chaîne
EntityCount, int

Output :

 @concat ('{
    "partitionKey": "', pipeline (). Pipeline,'",
    "rowKey": "', pipeline (). RunId,': ', item (),'",
    "body": "', base64 (concat (' {
            "packageName": "', pipeline (). parameters.PackageName,'",
            "executionId": "', pipeline (). parameters.ExecutionId,'",
            "entityNo": ', item (),'
        } ')),' "
} ') 

ProcessPackage Pipeline – ForEach Entity

Le cœur du pipeline est le pour chaque entité, qui a effectué la montée de l'entité de données dans l'entrepôt de données.

 Process Pipeline Foreach

Tout le pipeline

Ce pipeline est la coordination des trois pipelines. Il utilise le paramètre PackageNames CSV en tant que lot pour exporter, obtenir et traiter.

 Tout

Entrée :

 PackageNames, chaîne (CSV) 

Sortie :

 aucune 

Tous les pipelines – ForEach PackageName

Pour chaque nom de package dans le paramètre CSV, le pipeline ExportPackage est exécuté. Étant donné que chaque pipeline ExportPackage enregistre les données de sortie, le RunId de chaque pipeline est ajouté à une variable de tableau afin que les résultats puissent être utilisés pour l'activité suivante.

 Tous Foreach Pkg

All Pipeline – ForEach Export

Après l'exécution de l'exportation pour chaque nom de package dans le paramètre CSV, pour chaque RunId le GetPackage et ProcessPackage les pipelines sont exécutés. Dans ce pipeline, une activité Azure Function est utilisée pour obtenir les données du ExportPackage RunId associé et l'utilise pour exécuter le GetPackage . pipeline De la même manière, une autre activité Azure Function est utilisée pour obtenir les données du GetPackage RunId associé et l'utilise pour exécuter le ProcessPackage pipeline.

 Tous les exportations Foreach

Obtenez les résultats ExportPackage :

 @concat ('{
    "partitionKey": "ExportPackage",
    "rowKey": "', item (),'"
} ') 

Obtenez GetPackage résultats:

 @concat (' {
    "partitionKey": "GetPackage",
    "rowKey": "', json (activity (' Execute Get '). output) .pipelineRunId,'"
} ') 

Remarque: à partir de la sortie du pipeline GetPackage la valeur pipelineRunId est nécessaire pour lire l'enregistrement de table comme clé de ligne. La valeur de sortie de l'activité d'exécution du pipeline est convertie en un objet json afin de référencer la valeur de la propriété.

Les fichiers JSON des exemples de pipelines, ainsi que le fichier de code de la fonction Azure, sont disponibles ModularADFwithJSON .

À propos de l'auteur

Gary Brandt est un architecte de solutions senior spécialisé dans les solutions personnalisées conception, développement et livraison à l'aide de la plateforme Microsoft et des services cloud Azure. Il a plus de 20 ans d'expérience en développement et en conseil et a vu beaucoup de technologies différentes au fil des ans. Il est toujours enthousiasmé par les changements émergents et pour voir comment ils auront un impact sur le travail que nous faisons aujourd'hui et à l'avenir.

More from this Author






Source link