Fermer

septembre 4, 2024

Automatisation des workflows ETL avec Apache Airflow : une solution universelle pour la gestion des données

Automatisation des workflows ETL avec Apache Airflow : une solution universelle pour la gestion des données


Introduction

Une grande partie des données sont traitées quotidiennement de diverses manières, comme les transactions financières, les interactions avec les clients, les capteurs, les résultats de recherche, etc. Par exemple, les sociétés pharmaceutiques produisent des millions de données uniquement à partir d’essais cliniques. Cette énorme quantité de données doit être traitée avec précision et rapidité. Autrement, cela peut entraîner des complications telles qu’une lenteur des approbations réglementaires et une perte d’opportunités commerciales.

Révolutionner les flux de travail de données pharmaceutiques avec Apache Airflow

Les sociétés pharmaceutiques s’appuient sur des quantités massives d’informations qui doivent être soigneusement gérées à plusieurs étapes depuis les premiers stades de recherche et de développement, les tests cliniques et la soumission réglementaire finale. Une bonne gestion et automatisation de ces processus de données garantissent l’exactitude et les normes, tout en évitant des retards coûteux dans la mise sur le marché de nouveaux médicaments.

Apache Airflow est un outil ETL open source standard de l’industrie qui aide à gérer un flux de travail complexe, ce qui le rend également adapté à une utilisation dans l’industrie pharmaceutique. Nous présenterons un exemple de développement basé sur Airflow pour aider à automatiser le traitement des données d’essais cliniques.

Cas d’utilisation commerciale : traitement automatisé des données d’essais cliniques

Une société pharmaceutique mène plusieurs essais cliniques en même temps. L’étendue des données générées dans chaque essai clinique est vaste. Cela nécessite un traitement, un nettoyage et une analyse systématiques. La société a l’intention d’utiliser une solution automatisée pour piloter ETL (Extract-Transform-Load) pour la gestion des données d’essais cliniques, garantissant ainsi des rapports opportuns et précis aux agences de réglementation et aux autres parties prenantes.

Problèmes:

  • Volume et complexité des données: Réconciliation de millions de points de données sans intervention humaine
  • Conformité: Assurer l’exactitude des données et le dépôt dans des délais auprès des différentes autorités de régulation
  • Allocation des ressources: Affecter les efforts manuels consacrés au traitement des données aux activités stratégiques

En automatisant le processus ETL avec Apache Airflow, l’entreprise peut relever ces défis efficacement, en améliorant l’efficacité opérationnelle, en améliorant la précision des données et en garantissant la conformité.

diagramme de flux de travail du flux d'air

diagramme de flux de travail du flux d’air

Le diagramme ci-dessus montre le flux de travail de traitement des données cliniques à l’aide du flux d’air.

Voyons maintenant comment Apache Airflow peut être utilisé pour automatiser le flux de travail de traitement des données des essais cliniques. Nous allons créer un flux de travail factice (DAG) pour illustrer les étapes à suivre pour y parvenir avec des exemples de données.

Mise en œuvre de Traitement des données d’essais cliniques avec Apache Airflow

Qu’est-ce qu’Apache Airflow ?

Apache Airflow est une plate-forme open source conçue pour l’autorisation, la planification et la surveillance programmatiques des flux de travail. Les utilisateurs peuvent créer des DAG dans lesquels chaque nœud est une tâche interconnectée par des bords, qui définissent les dépendances entre les nœuds. Développé à l’origine chez Airbnb et remis à l’Apache Software Foundation, Airflow est actuellement utilisé pour aider les entreprises de divers secteurs à gérer des flux de travail complexes.

Principales fonctionnalités d’Apache Airflow

  • Dynamique: Les workflows Airflow sont définis en Python, permettant ainsi la génération dynamique de pipelines. Cela permet d’utiliser une logique complexe pour la création de pipeline.
  • Évolutif: Il peut évoluer pour gérer un nombre croissant de tâches en utilisant un moteur d’exécution distribué.
  • Extensible: Airflow a une architecture conçue pour l’ajout facile de nouveaux opérateurs, capteurs et crochets, ce qui le rend hautement personnalisable.
  • Interface utilisateur: Airflow dispose d’une interface utilisateur riche permettant de visualiser les pipelines, de suivre les progrès en temps réel et de déboguer les problèmes. De plus, Airflow permet à la fois le déclenchement manuel de tâches et la réexécution facile des tâches ayant échoué.
  • Planificateur: Le planificateur d’Airflow exécutera les tâches dans l’ordre approprié. Les tentatives seront traitées ; les flux de travail doivent s’exécuter à temps ou lorsque certains événements externes se produisent.
  • Intégrations: Airflow s’intègre à une grande variété de services et de systèmes, notamment des bases de données, du stockage cloud, des entrepôts de données et bien d’autres.

Condition préalable:
– Python doit être installé sur le système (python 3.7 ou version ultérieure)

Étape 1 : Configuration du flux d’air

Tout d’abord, installez Apache Airflow à l’aide des commandes suivantes :

– pip install apache-airflow
– airflow db init
– airflow users create \
      --username admin \
      --firstname Admin \
      --lastname User \
      --role Admin \
      --email admin@example.com

De plus, installez les packages nécessaires pour interagir avec S3, Postgres ou Redshift

– pip install boto3 pandas psycopg2-binary

Démarrez le service Apache Airflow

– airflow webserver --port 8080
– airflow scheduler

Étape 2 : Création du flux de travail (Dag)

Un DAG est un graphique acyclique dirigé dans Airflow, visualisant un flux de travail ; chaque nœud représente une tâche, tandis que les bords définissent les dépendances entre les tâches.

Ce qui suit est un DAG factice pour le flux de travail de traitement des données d’essais cliniques :

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
import boto3
import pandas as pd
import psycopg2

#session
session = boto3.Session(
aws_access_key_id=Variable.get('Access key'),
aws_secret_access_key=Variable.get('Secret access key'),
)

# Configuration for S3 and PostgreSQL
S3_BUCKET_NAME = 'airflow-demo-bucket1'
S3_KEY = 'source/clinical_data.csv'
CLEANED_DATA_KEY = 'source/cleaned_data/cleaned_clinical_data.csv'
ANALYSIS_SUMMARY_KEY = 'path/to/clinical_analysis_summary.csv'
POSTGRES_HOST = 'localhost'
POSTGRES_PORT = '5432'
POSTGRES_DB = 'clinicTrial'
POSTGRES_USER = 'prince'
POSTGRES_PASSWORD = '12345'
POSTGRES_TABLE = 'clinical_trial_data'

def clean_column(df, column_name):
       for x in df.index:
           if df.loc[x, column_name] == '.' or df.loc[x, column_name] == ' ':
           df.loc[x, column_name] = None

def extract_clinical_data():
    # Extract clinical trial data from S3
    print("Extracting clinical trial data from S3...")
    s3 = session.client('s3')
    response = s3.get_object(Bucket=S3_BUCKET_NAME, Key=S3_KEY)
    clinical_data = pd.read_csv(response['Body'])
    clinical_data.to_csv('/tmp/clinical_data.csv', index=False)
    print("Data extraction complete.")

def clean_clinical_data():
    # Clean clinical trial data
    print("Cleaning clinical trial data...")
    clinical_data = pd.read_csv('/tmp/clinical_data.csv')
    clinical_data['Specimen_date'] = clinical_data['Specimen_date'].str.strip() 
    clinical_data['Specimen_date'] = pd.to_datetime(clinical_data['Specimen_date'], format="mixed")
    clean_column(clinical_data, "Date_of_Death")
    clinical_data['Date_of_Death'] = pd.to_datetime(clinical_data['Date_of_Death'], format="mixed")
    clinical_data['Date_of_Last_Follow_Up'] = pd.to_datetime(clinical_data['Date_of_Last_Follow_Up'], format="mixed")
    clinical_data['Time'] = pd.to_numeric(clinical_data['Time'])
    clinical_data['Event'] = clinical_data['Event'].astype(int) 
    clinical_data.to_csv('/tmp/cleaned_clinical_data.csv', index=False)
    # Upload the cleaned data to S3
    s3 = session.client('s3')
    s3.upload_file('/tmp/cleaned_clinical_data.csv', S3_BUCKET_NAME, CLEANED_DATA_KEY)
    print("Data cleaning complete.")

def analyze_clinical_data():
    # Analyze clinical trial data
    print("Analyzing clinical trial data...")
    s3 = session.client('s3')
    s3.download_file(S3_BUCKET_NAME, CLEANED_DATA_KEY, '/tmp/cleaned_clinical_data.csv')
    cleaned_data = pd.read_csv('/tmp/cleaned_clinical_data.csv')
    # Calculate survival time for each patient
    cleaned_data['Specimen_date'] = pd.to_datetime(cleaned_data['Specimen_date'], format="mixed")
    cleaned_data['Date_of_Death'] = pd.to_datetime(cleaned_data['Date_of_Death'], format="mixed")
    cleaned_data['Survival_Time'] = (cleaned_data['Date_of_Death'] - cleaned_data['Specimen_date']).dt.days
    summary = cleaned_data.groupby('Stage')['Survival_Time'].mean()
    summary.to_csv('/tmp/clinical_analysis_summary.csv')
    # Upload the analysis summary to S3
    s3.upload_file('/tmp/clinical_analysis_summary.csv', S3_BUCKET_NAME, ANALYSIS_SUMMARY_KEY)
    print(f"Data analysis complete. Summary:\n{summary}")

def load_data_to_postgres():
    # Load data into PostgreSQL
    print("Loading data into PostgreSQL...")
    s3 = session.client('s3')
    s3.download_file(S3_BUCKET_NAME, CLEANED_DATA_KEY, '/tmp/cleaned_clinical_data.csv')
    cleaned_data = pd.read_csv('/tmp/cleaned_clinical_data.csv')
    cleaned_data.dropna(inplace=True)
    conn = psycopg2.connect(
      dbname=POSTGRES_DB,
      user=POSTGRES_USER,
      password=POSTGRES_PASSWORD,
      host=POSTGRES_HOST,
      port=POSTGRES_PORT
    )
    cursor = conn.cursor()
    # Create table if not exists (for simplicity)
    create_table_query = f"""
       CREATE TABLE IF NOT EXISTS {POSTGRES_TABLE} (
       PatientID INT,
       Specimen_date DATE,
       Dead_or_Alive VARCHAR(5),
       Date_of_Death DATE,
       Date_of_Last_Follow_Up DATE,
       sex VARCHAR(1),
       race VARCHAR(1),
       Stage VARCHAR(10),
       Event INT,
       Time INT
       );
      """
      cursor.execute(create_table_query)
      # Insert data into the table
      for _, row in cleaned_data.iterrows():
             insert_query = f"""
              INSERT INTO {POSTGRES_TABLE} (PatientID, Specimen_date, Dead_or_Alive, Date_of_Death, Date_of_Last_Follow_Up, sex, race, Stage, Event, Time)
              VALUES ({row['PatientID']}, '{row['Specimen_date']}', '{row['Dead_or_Alive']}', '{row['Date_of_Death']}', '{row['Date_of_Last_Follow_Up']}', '{row['sex']}', '{row['race']}', '{row['Stage']}', {row['Event']}, {row['Time']});
             """
      cursor.execute(insert_query)
      conn.commit()
      cursor.close()
      conn.close()
      print("Data loaded into PostgreSQL.")

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 7, 9),
'retries': 1,
}

with DAG('clinical_trial_data_processing', default_args=default_args, schedule_interval="@daily", catchup=False) as dag:
         start = DummyOperator(task_id='start')
         extract = PythonOperator(
                task_id='extract_clinical_data',
                python_callable=extract_clinical_data
               )

         clean = PythonOperator(
               task_id='clean_clinical_data',
               python_callable=clean_clinical_data
               )

          analyze = PythonOperator(
                task_id='analyze_clinical_data',
                python_callable=analyze_clinical_data
               )

          load = PythonOperator(
              task_id='load_data_to_postgres',
              python_callable=load_data_to_postgres
               )

          end = DummyOperator(task_id='end')
          start >> extract >> clean >> analyze >> load >> end

Étape 3 : Exécuter le flux de travail

  1. Créez un dossier dags et enregistrez le fichier DAG sous clinic_trial_data_processing_dag.py dans le répertoire dags de votre installation Airflow.
  2. Démarrez le serveur Web et le planificateur Airflow s’ils ne sont pas déjà en cours d’exécution
  3. serveur Web Airflow – port 8080
  4. planificateur de débit d’air
  5. Accédez à l’interface utilisateur Web d’Airflow en accédant à http://localhost:8080 dans votre navigateur Internet.
  6. Activez le DAG clinic_trial_data_processing à partir de l’interface utilisateur Web et surveillez son exécution.
airflow-workflow-web-ui

airflow-workflow-web-ui.png

Avantages gagnés par l’entreprise :

Apache Airflow aide les entreprises pharmaceutiques à :

  • Améliorer l’efficacité : Automatisez les tâches répétitives et libérez certaines des ressources les plus précieuses
  • Précision améliorée : Réduire les erreurs humaines dans le traitement des données
  • Conformité: Veiller à ce que les rapports soient soumis à temps et avec exactitude aux divers organismes de réglementation
  • Évolutivité : Gérez facilement des volumes croissants de données à mesure que la taille des essais cliniques augmente

Conclusion

Apache Airflow est en mesure d’aider les entreprises pharmaceutiques à transformer la gestion des données d’une tâche écrasante en un système rationalisé. Il devrait s’agir d’un élément important dans la stratégie d’une entreprise. Boîte à outils de transformation numérique pour stimuler l’innovation et atteindre l’efficacité opérationnelle.






Source link