Fermer

juin 6, 2025

Construire et orchestrer des pipelines de données complexes / blogs / perficient

Construire et orchestrer des pipelines de données complexes / blogs / perficient


Dans cet article, nous plongerons dans l’orchestration des pipelines de données avec l’API Databricks Jobs, vous permettant d’automatiser, de surveiller et d’échapper les flux de travail de manière transparente dans la plate-forme Databricks.

Pourquoi orchestrer avec l’API des travaux de données de données?

Lorsque les pipelines de données deviennent complexes impliquant plusieurs étapes – comme exécuter des cahiers, la mise à jour des tables Delta ou des modèles d’apprentissage automatique, vous avez besoin d’un moyen fiable de les automatiser et de les gérer facilement. L’API Databricks Jobs offre un moyen flexible et efficace d’automatiser vos travaux / flux de travail directement au sein de Databricks ou à partir de systèmes externes (par exemple AWS Lambda ou Azure Fonctions) en utilisant les points de terminaison de l’API.

Contrairement aux orchestrateurs externes tels que Apache Airflow, Dagster etc., qui nécessitent une infrastructure et une intégration distinctes, l’API Jobs est construit nativement dans la plate-forme Databricks. Et la meilleure partie? Cela ne coûte rien de plus. L’API Databricks Jobs vous permet de gérer pleinement le cycle de vie de vos travaux / flux de travail à l’aide de simples demandes HTTP.

Vous trouverez ci-dessous la liste des points de terminaison de l’API pour les opérations CRUD sur les workflows:

  • Créer: Configurez de nouveaux travaux avec des tâches et des configurations définies via le message /api/2.1/jobs/create Définissez des travaux uniques ou multi-tâches, en spécifiant les tâches à exécuter (par exemple, cahiers, pots, scripts Python), leurs dépendances et les ressources de calcul.
  • Récupérer: Accédez aux détails du travail, vérifiez les statuts et révisez les journaux d’exécution à l’aide de Get /api/2.1/jobs/get ou obtenir /api/2.1/jobs/list.
  • Mise à jour: Modifier les paramètres du travail tels que les paramètres, les séquences de tâches ou les détails du cluster via la publication /api/2.1/jobs/update et /api/2.1/jobs/reset.
  • Supprimer: Supprimer des travaux qui ne sont plus nécessaires en utilisant le post /api/2.1/jobs/delete.

Ces capacités de crud complètes font de l’API de l’emploi un outil puissant pour automatiser complètement la gestion de l’emploi, de la création et de la surveillance à la modification et à la suppression – éliminant le besoin de manipulation manuelle.

Composants clés d’un travail de databricks

  • Tâches: Unités individuelles de travail dans un travail, comme l’exécution d’un cahier, d’un pot, d’un script Python ou d’une tâche DBT. Les travaux peuvent avoir plusieurs tâches avec des dépendances définies et une exécution conditionnelle.
  • Dépendances: Relations entre les tâches qui déterminent l’ordre d’exécution, vous permettant de créer des flux de travail complexes avec des étapes séquentielles ou parallèles.
  • Groupes: Les ressources de calcul sur lesquelles les tâches se déroulent. Ceux-ci peuvent être éphémères grappes d’emploi créé spécifiquement pour le travail ou existant Clusters polyvalents partagé entre les emplois.
  • Retries: Configuration pour réessayer automatiquement les tâches échouées pour améliorer la fiabilité du travail.
  • Planification: Options pour exécuter des travaux sur des horaires basés sur Cron, des événements déclenchés ou à la demande.
  • Notifications: Alertes pour le démarrage, le succès ou l’échec de l’emploi à tenir les équipes informées.

Commencer avec le Databricks API Jobs

Avant de tirer parti de l’API des travaux de données de données pour l’orchestration, assurez-vous d’avoir accès à un espace de travail Databricks, un jeton d’accès personnel valide (PAT) et des privilèges suffisants pour gérer les ressources de calcul et les configurations de travail. Ce guide parcourra les opérations clés de CRUD et les points de terminaison de l’API de l’emploi pertinent pour une automatisation robuste du workflow.

1. Création d’un nouveau travail de travail / travail:

Pour créer un emploi, vous envoyez une demande de poste au /api/2.1/jobs/create Point de terminaison avec une charge utile JSON définissant la configuration du travail.

{
  "name": "Ingest-Sales-Data",
  "tasks": [
    {
      "task_key": "Ingest-CSV-Data",
      "notebook_task": {
        "notebook_path": "/Users/name@email.com/ingest_csv_notebook",
        "source": "WORKSPACE"
      },
      "new_cluster": {
        "spark_version": "15.4.x-scala2.12",
        "node_type_id": "i3.xlarge",
        "num_workers": 2
      }
    }
  ],
  "schedule": {
    "quartz_cron_expression": "0 30 9 * * ?",
    "timezone_id": "UTC",
    "pause_status": "UNPAUSED"
  },
  "email_notifications": {
    "on_failure": [
      "name@email.com"
    ]
  }
}

Cette charge utile JSON définit un travail de databricks qui exécute une tâche basée sur un cahier sur un cluster nouvellement provisionné, prévue pour s’exécuter quotidiennement à 9h30 UTC. Les composants de la charge utile sont expliqués ci-dessous:

  • Nom: le nom de votre travail.
  • Tâches: un tableau de tâches à exécuter. Un travail peut avoir une ou plusieurs tâches.
    • Task_key: un identifiant unique pour la tâche dans le travail. Utilisé pour définir les dépendances.
    • Notebook_task: spécifie une tâche de carnet. Les autres types de tâches incluent Spark_Jar_Task, Spark_Python_Task, Spark_Submit_Task, Pipeline_Task, etc.
      • Notebook_path: le chemin d’accès au cahier de votre espace de travail Databricks.
      • Source: la source du cahier (par exemple, Workspace, Git).
    • new_cluster: définit la configuration d’un nouveau cluster qui sera créé pour cette exécution. Vous pouvez également utiliser existant_cluster_id pour utiliser un cluster tout usage existant (bien que de nouveaux groupes de travail soient recommandés).
      • Spark_version, node_type_id, num_workers: options de configuration standard du cluster.
  • Horaire: définit le calendrier d’emploi à l’aide d’une expression Cron et d’un fuseau horaire.
  • Email_Notifications: configure les notifications par e-mail pour les événements de travail.

Pour créer un flux de travail Databricks, la charge utile JSON ci-dessus peut être incluse dans le corps d’une demande post-demande envoyée à l’API Créer un point de terminaison de l’API – en utilisant Curl ou par programmation via la bibliothèque Python Demandes comme indiqué ci-dessous:

Utilisation de Curl:

curl -X POST \
  https://<databricks-instance>.cloud.databricks.com/api/2.1/jobs/create \
  -H "Authorization: Bearer <Your-PAT>" \
  -H "Content-Type: application/json" \
  -d '@workflow_config.json' #Place the above payload in workflow_config.json

Utilisation de la bibliothèque de requêtes Python:

import requests
import json
create_response = requests.post("https://<databricks-instance>.cloud.databricks.com/api/2.1/jobs/create", data=json.dumps(your_json_payload), auth=("token", token))
if create_response.status_code == 200:
    job_id = json.loads(create_response.content.decode('utf-8'))["job_id"]
    print("Job created with id: {}".format(job_id))
else:
    print("Job creation failed with status code: {}".format(create_response.status_code))
    print(create_response.text)

L’exemple ci-dessus a démontré un flux de travail de base unique. Cependant, le plein potentiel de l’API Jobs réside dans l’orchestration des flux de travail multi-tâches avec des dépendances. Le tasks Le tableau dans la charge utile du travail vous permet de configurer plusieurs tâches dépendantes.
Par exemple, le flux de travail suivant définit trois tâches qui s’exécutent séquentiellement: Ingest-CSV-DataTransform-Sales-DataWrite-to-Delta.

{
  "name": "Ingest-Sales-Data-Pipeline",
  "tasks": [
    {
      "task_key": "Ingest-CSV-Data",
      "notebook_task": {
        "notebook_path": "/Users/name@email.com/ingest_csv_notebook",
        "source": "WORKSPACE"
      },
      "new_cluster": {
        "spark_version": "15.4.x-scala2.12",
        "node_type_id": "i3.xlarge",
        "num_workers": 2
      }
    },
    {
      "task_key": "Transform-Sales-Data",
      "depends_on": [
        {
          "task_key": "Ingest-CSV-Data"
        }
      ],
      "notebook_task": {
        "notebook_path": "/Users/name@email.com/transform_sales_data",
        "source": "WORKSPACE"
      },
      "new_cluster": {
        "spark_version": "15.4.x-scala2.12",
        "node_type_id": "i3.xlarge",
        "num_workers": 2
      }
    },
    {
      "task_key": "Write-to-Delta",
      "depends_on": [
        {
          "task_key": "Transform-Sales-Data"
        }
      ],
      "notebook_task": {
        "notebook_path": "/Users/name@email.com/write_to_delta_notebook",
        "source": "WORKSPACE"
      },
      "new_cluster": {
        "spark_version": "15.4.x-scala2.12",
        "node_type_id": "i3.xlarge",
        "num_workers": 2
      }
    }
  ],
  "schedule": {
    "quartz_cron_expression": "0 30 9 * * ?",
    "timezone_id": "UTC",
    "pause_status": "UNPAUSED"
  },
  "email_notifications": {
    "on_failure": [
      "name@email.com"
    ]
  }
}

Photo1

2. Updating Existing Workflows:

For modifying existing workflows, we have two endpoints: the update endpoint /api/2.1/jobs/update and the reset endpoint /api/2.1/jobs/reset. The update endpoint applies a partial update to your job. This means you can tweak parts of the job — like adding a new task or changing a cluster spec — without redefining the entire workflow. While the reset endpoint does a complete overwrite of the job configuration. Therefore, when resetting a job, you must provide the entire desired job configuration, including any settings you wish to keep unchanged, to avoid them being overwritten or removed entirely. Let us go over a few examples to better understand the endpoints better.

2.1. Update Workflow Name & Add New Task:

Let us modify the above workflow by renaming it from Ingest-Sales-Data-Pipeline to Sales-Workflow-End-to-End, adding an input parametersource_location to the Ingest-CSV-Data, and introducing a new task Write-to-Postgres, which runs after the successful completion of Transform-Sales-Data.

{
  "job_id": 947766456503851,
  "new_settings": {
    "name": "Sales-Workflow-End-to-End",
    "tasks": [
      {
        "task_key": "Ingest-CSV-Data",
        "notebook_task": {
          "notebook_path": "/Users/name@email.com/ingest_csv_notebook",
          "base_parameters": {
            "source_location": "s3://<bucket>/<key>"
          },
          "source": "WORKSPACE"
        },
        "new_cluster": {
          "spark_version": "15.4.x-scala2.12",
          "node_type_id": "i3.xlarge",
          "num_workers": 2
        }
      },
      {
        "task_key": "Transform-Sales-Data",
        "depends_on": [
          {
            "task_key": "Ingest-CSV-Data"
          }
        ],
        "notebook_task": {
          "notebook_path": "/Users/name@email.com/transform_sales_data",
          "source": "WORKSPACE"
        },
        "new_cluster": {
          "spark_version": "15.4.x-scala2.12",
          "node_type_id": "i3.xlarge",
          "num_workers": 2
        }
      },
      {
        "task_key": "Write-to-Delta",
        "depends_on": [
          {
            "task_key": "Transform-Sales-Data"
          }
        ],
        "notebook_task": {
          "notebook_path": "/Users/name@email.com/write_to_delta_notebook",
          "source": "WORKSPACE"
        },
        "new_cluster": {
          "spark_version": "15.4.x-scala2.12",
          "node_type_id": "i3.xlarge",
          "num_workers": 2
        }
      },
      {
        "task_key": "Write-to-Postgres",
        "depends_on": [
          {
            "task_key": "Transform-Sales-Data"
          }
        ],
        "notebook_task": {
          "notebook_path":"/Users/name@email.com/write_to_postgres_notebook",
          "source": "WORKSPACE"
        },
        "new_cluster": {
          "spark_version": "15.4.x-scala2.12",
          "node_type_id": "i3.xlarge",
          "num_workers": 2
        }
      }
    ],
    "schedule": {
      "quartz_cron_expression": "0 30 9 * * ?",
      "timezone_id": "UTC",
      "pause_status": "UNPAUSED"
    },
    "email_notifications": {
      "on_failure": [
        "name@email.com"
      ]
    }
  }
}

Photo2

2.2. Mettre à jour la configuration du cluster:

Le démarrage des cluster peut prendre plusieurs minutes, en particulier pour les grappes plus grandes et plus complexes. Le partage du même cluster permet aux tâches suivantes de démarrer immédiatement après les précédentes terminées, accélérant l'ensemble du flux de travail. Les tâches parallèles peuvent également fonctionner simultanément en partageant efficacement les mêmes ressources de cluster. Mettons à jour le flux de travail ci-dessus pour partager le même cluster entre toutes les tâches.

{
  "job_id": 947766456503851,
  "new_settings": {
    "name": "Sales-Workflow-End-to-End",
    "job_clusters": [
      {
        "job_cluster_key": "shared-cluster",
        "new_cluster": {
          "spark_version": "15.4.x-scala2.12",
          "node_type_id": "i3.xlarge",
          "num_workers": 2
        }
      }
    ],
    "tasks": [
      {
        "task_key": "Ingest-CSV-Data",
        "notebook_task": {
          "notebook_path": "/Users/name@email.com/ingest_csv_notebook",
          "base_parameters": {
            "source_location": "s3://<bucket>/<key>"
          },
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      },
      {
        "task_key": "Transform-Sales-Data",
        "depends_on": [
          {
            "task_key": "Ingest-CSV-Data"
          }
        ],
        "notebook_task": {
          "notebook_path": "/Users/name@email.com/transform_sales_data",
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      },
      {
        "task_key": "Write-to-Delta",
        "depends_on": [
          {
            "task_key": "Transform-Sales-Data"
          }
        ],
        "notebook_task": {
          "notebook_path": "/Users/name@email.com/write_to_delta_notebook",
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      },
      {
        "task_key": "Write-to-Postgres",
        "depends_on": [
          {
            "task_key": "Transform-Sales-Data"
          }
        ],
        "notebook_task": {
          "notebook_path":"/Users/name@email.com/write_to_postgres_notebook",
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      }
    ],
    "schedule": {
      "quartz_cron_expression": "0 30 9 * * ?",
      "timezone_id": "UTC",
      "pause_status": "UNPAUSED"
    },
    "email_notifications": {
      "on_failure": [
        "name@email.com"
      ]
    }
  }
}

Photo3

2.3. Mettre à jour les dépendances des tâches:

Ajoutons une nouvelle tâche nommée Enrich-Sales-Data et mettre à jour la dépendance comme indiqué ci-dessous:
Ingest-CSV-Data →
Enrich-Sales-Data → Transform-Sales-Data →[Write-to-Delta, Write-to-Postgres].Étant donné que nous mettons à jour les dépendances des tâches existantes, nous devons utiliser le point de terminaison de réinitialisation /api/2.1/jobs/reset.

{
  "job_id": 947766456503851,
  "new_settings": {
    "name": "Sales-Workflow-End-to-End",
    "job_clusters": [
      {
        "job_cluster_key": "shared-cluster",
        "new_cluster": {
          "spark_version": "15.4.x-scala2.12",
          "node_type_id": "i3.xlarge",
          "num_workers": 2
        }
      }
    ],
    "tasks": [
      {
        "task_key": "Ingest-CSV-Data",
        "notebook_task": {
          "notebook_path":"/Users/name@email.com/ingest_csv_notebook",
          "base_parameters": {
            "source_location": "s3://<bucket>/<key>"
          },
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      },
      {
        "task_key": "Enrich-Sales-Data",
        "depends_on": [
          {
            "task_key": "Ingest-CSV-Data"
          }
        ],
        "notebook_task": {
          "notebook_path":"/Users/name@email.com/enrich_sales_data",
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      },
      {
        "task_key": "Transform-Sales-Data",
        "depends_on": [
          {
            "task_key": "Enrich-Sales-Data"
          }
        ],
        "notebook_task": {
          "notebook_path":"/Users/name@email.com/transform_sales_data",
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      },
      {
        "task_key": "Write-to-Delta",
        "depends_on": [
          {
            "task_key": "Transform-Sales-Data"
          }
        ],
        "notebook_task": {
          "notebook_path":"/Users/name@email.com/write_to_delta_notebook",
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      },
      {
        "task_key": "Write-to-Postgres",
        "depends_on": [
          {
            "task_key": "Transform-Sales-Data"
          }
        ],
        "notebook_task": {
          "notebook_path":"/Users/name@email.com/write_to_postgres_notebook",
          "source": "WORKSPACE"
        },
        "job_cluster_key": "shared-cluster"
      }
    ],
    "schedule": {
      "quartz_cron_expression": "0 30 9 * * ?",
      "timezone_id": "UTC",
      "pause_status": "UNPAUSED"
    },
    "email_notifications": {
      "on_failure": [
        "name@email.com"
      ]
    }
  }
}

Photo4

Le point de terminaison de mise à jour est utile pour des modifications mineures telles que la mise à jour du nom du workflow, la mise à jour du chemin d'accès à l'ordinateur portable, les paramètres d'entrée aux tâches, la mise à jour de la planification de l'emploi, la modification des configurations de cluster comme le nombre de nœuds, etc., tandis que le point de terminaison de réintégration doit être utilisé pour supprimer les tâches existantes, redéfinir les dépendances des tâches, renammer les tasks, etc.
Le point de terminaison de mise à jour ne supprime pas les tâches ou les paramètres que vous omettez, c'est-à-dire que les tâches non mentionnées dans la demande rester inchangétandis que le point de terminaison de réinitialisation supprime / supprime tous les champs ou tâches non inclus dans la demande.

3 et 3 Déclencher un travail / flux de travail existant:

Utiliser le/api/2.1/jobs/run-now Point de terminaison pour déclencher un travail à la demande. Passez les paramètres d'entrée à vos tâches de cahier à l'aide dunotebook_paramschamp.

curl -X POST https://<databricks-instance>/api/2.1/jobs/run-now \
  -H "Authorization: Bearer <DATABRICKS_TOKEN>" \
  -H "Content-Type: application/json" \
  -d '{
    "job_id": 947766456503851,
    "notebook_params": {
      "source_location": "s3://<bucket>/<key>"
    }
  }'

4. Obtenez le statut du travail:

Pour vérifier l'état d'un travail spécifique, utilisez le /api/2.1/jobs/runs/get Point de terminaison avec le run_id. La réponse comprend des détails sur la course, y compris son état (par exemple, en attente, en cours d'exécution, terminé, échoué, etc.).

curl -X GET \
  https://<databricks-instance>.cloud.databricks.com/api/2.1/jobs/runs/get?run_id=<your-run-id> \
  -H "Authorization: Bearer <Your-PAT>"

5. Supprimer le travail:

Pour supprimer un flux de travail existant de données, appelez simplement le DELETE /api/2.1/jobs/delete Point de terminaison à l'aide de l'API Jobs. Cela vous permet de nettoyer programmativement des travaux obsolètes ou inutiles dans le cadre de votre stratégie de gestion de pipelines.

curl -X POST https://<databricks-instance>/api/2.1/jobs/delete \
  -H "Authorization: Bearer <DATABRICKS_PERSONAL_ACCESS_TOKEN>" \
  -H "Content-Type: application/json" \
  -d '{ "job_id": 947766456503851 }'

Conclusion:

L'API Databricks Jobs permet aux ingénieurs de données d'orchestrer les flux de travail complexes nativement, sans compter sur des outils de planification externes. Que vous automatisant des exécutions de cahier, le chaînage de pipelines en plusieurs étapes ou que vous vous intégriez à des systèmes CI / CD, l'API offre un contrôle et une flexibilité à grains fins. En maîtrisant cette API, vous ne construisez pas seulement les workflows - vous construisez des pipelines de données de qualité de production évolutifs qui sont plus faciles à gérer, à surveiller et à évoluer.






Source link