Site icon Blog ARC Optimizer

Surveiller le flux de travail avec Apache Airflow

Surveiller le flux de travail avec Apache Airflow


Apache Airflow est un outil open source permettant de créer, de planifier et de surveiller par programmation des flux de travail. C’est l’une des plates-formes les plus robustes utilisées par les ingénieurs de données pour orchestrer les flux de travail ou les pipelines. Vous pouvez facilement visualiser les dépendances, la progression, les journaux, le code, les tâches de déclenchement et l’état de réussite de vos pipelines de données.

Avec l’aide d’Apache Airflow et de Python, nous pouvons facilement créer des flux de travail sans utiliser l’infrastructure sous-jacente pour l’évolutivité et la sécurité.

Qu’est-ce que le flux de travail ?

En termes simples, le flux de travail est une séquence de tâches. Le flux de travail sera démarré dans les délais ou déclenché par un événement. Workflow est utilisé pour gérer les pipelines de traitement de données volumineuses.

Les workflows basés sur Airflow ont des tâches où les sorties sont des entrées pour d’autres tâches. Dans Workflow, le bouclage n’est pas possible car la sortie de chaque étape est une entrée dans l’étape suivante.

Exemple typique de flux de travail

Dans Airflows, ces flux de travail sont représentés sous forme de graphiques acycliques dirigés (DAG).

Qu’est-ce que le DAG ?

Un DAG est défini dans un script Python, qui représente la structure des DAG (tâches et leurs dépendances) sous forme de code. DAG n’est rien d’autre qu’un ensemble de tâches que vous souhaitez exécuter les unes après les autres. Chaque nœud du graphe est une tâche. En d’autres termes, lorsque nous décidons du flux de travail, nous devons réfléchir au travail à diviser en petites tâches.

Le graphe est forcé d’être acyclique afin qu’il n’y ait pas de dépendances circulaires qui peuvent provoquer des boucles d’exécution infinies

Un DAG n’a pas de cycles, un DAG est un pipeline de données dans Apache Airflow. Ainsi, chaque fois que vous lisez « DAG », cela signifie « pipeline de données ».

Flux de travail valide du DAG

Flux de travail non valide du DAG

Qu’est-ce que Node ?

Node n’est rien d’autre qu’un opérateur. Un opérateur est une collection de logique de ce que nous voulons réaliser dans DAG. (Par exemple : si nous voulons écrire un DAG en Python, nous aurons un opérateur Python, de même si nous voulons exécuter le script en opérateur bash, il y aura un opérateur Bash.)

Quel est le besoin d’Apache Airflow dans Trending Technologies ?

  1. Moniteur de flux de travail
    • La surveillance joue un rôle crucial dans la gestion des données. Apache airflow nous permet de surveiller les flux de travail. Il garantit que vos systèmes et processus fonctionnent comme prévu. Les capacités de surveillance sont vraiment importantes pour les entreprises qui utilisent Airflow pour orchestrer et planifier leurs tâches de longue durée.
  1. Planificateur
    • Le planificateur est très important dans la gestion des données. Les ingénieurs de données ne peuvent pas garder un œil les uns sur les autres ainsi que sur chaque flux de travail. Avec l’aide de Scheduler, nous pouvons planifier un Workflow puis déclencher les instances de tâches une fois leurs dépendances terminées. Dans les coulisses, le planificateur lance un sous-processus qui surveille et reste synchronisé avec tous les DAG dans le répertoire DAG spécifié. Une fois par minute, par défaut, le planificateur collecte les résultats de l’analyse DAG et vérifie si des tâches actives peuvent être déclenchées.
  1. Interface utilisateur de surveillance
    • L’interface utilisateur native d’Airflow vous permet de visualiser les statuts de votre DAG et de vos tâches. De plus, vous pouvez surveiller quelques métriques natives à partir de cette interface utilisateur, mais il y a beaucoup de place pour l’amélioration (nous y reviendrons plus tard). Cela peut vous aider à effectuer une surveillance légère et un dépannage pour vos DAG.
  1. API de repos
    • L’API d’Airflow vous permet de créer des flux de travail à partir de sources externes et d’être un produit de données par-dessus. L’API reste vous permet d’utiliser le même paradigme que celui utilisé pour créer des pipelines, pour créer des flux de travail asynchrones, tels que des opérations de formation personnalisées en apprentissage automatique.
  1. Systèmes d’alerte
    • Airflow fournit un système d’alerte par défaut sur les tâches ayant échoué, le courrier électronique est la valeur par défaut, mais l’alerte via Slack peut être configurée à l’aide d’un rappel et de l’opérateur Slack.

Amazon Web Services (AWS) fournit un type de service similaire appelé Flux de travail gérés par Amazon pour Apache Airflow (MWAA). Alors, commençons par MWAA et voyons comment nous pouvons surveiller les flux de travail.

Conditions préalables:

  • Compartiment S3
  • Réseau VPC
  • Environnement de flux de travail

Tu devras…

  • Créer un compartiment S3 dans lequel vous pouvez télécharger du code Python
  • Créer un VPC et des sous-réseaux
  • Créer un flux de travail dans MWAA

Le tableau de bord d’Apache Workflow devrait ressembler à ce qui est illustré ci-dessous.

Créez un code Python en conséquence pour surveiller le flux de travail. Après un exemple de code Python, vous allez télécharger dans un compartiment S3.

à partir de la date et de l’heure d’importation de la date et de l’heure

# L’objet DAG ; nous en aurons besoin pour instancier un DAG

du DAG d’importation de flux d’air

# Les opérateurs; nous en avons besoin pour fonctionner !

depuis airflow.operators.python importer l’opérateur Python

depuis airflow.operators.dummy importer DummyOperator

#de tâches.get_configs importer get_configs

#de tâches.get_targets importer get_targets

#de tâches.push_targets importer push_targets

def print_hello() :

retour « Bonjour le monde »

dag = DAG (‘hello_world’ ,description=’testing’,

schedule_interval=’0 12 * * *’,

date_début = dateheure (2021 ,1,1), rattrapage=Faux)

dummy_operator = DummyOperator(task_id=’dummy_task’, retries=3, dag=dag)

hello_operator = PythonOperator(task_id=’hello_task’, python_callable=print_hello, dag=dag)

opérateur_fictif >> opérateur_bonjour

Une fois cela fait, nous accéderons à l’interface utilisateur d’Airflow et nous obtiendrons le tableau de bord de surveillance suivant d’Airflow. Ici, nous pouvons planifier un flux de travail, surveiller notre flux de travail et gérer les alertes.

Comment Perficient peut-il vous aider?

Perficient est un partenaire certifié d’Amazon Web Services avec plus de 10 ans d’expérience dans la fourniture d’applications et d’expertise au niveau de l’entreprise dans les solutions de plate-forme cloud, le centre de contact, la modernisation des applications, les migrations, l’analyse de données, les outils mobiles, de développement et de gestion, l’IoT, le sans serveur, la sécurité. , et plus. Associé à notre stratégie et à notre équipe de pointe, Perficient est équipé pour aider les entreprises à relever les défis les plus difficiles et à tirer le meilleur parti de leurs implémentations et intégrations.

Apprenez-en plus sur notre pratique AWS et contactez notre équipe ici !

A propos de l’auteur

Praful Itankar est un consultant technique avec plus de 7 ans d’expérience dans les technologies cloud. Il a hâte de partager ses connaissances sur les dernières tendances technologiques par le biais de blogs.

Plus de cet auteur






Source link
Quitter la version mobile