Fermer

mars 10, 2024

Comprendre le rôle de Py4J dans Databricks / Blogs / Perficient

Comprendre le rôle de Py4J dans Databricks / Blogs / Perficient


J’ai mentionné que ma tentative de implémenter TDD avec Databricks n’a pas été totalement couronné de succès. La configuration de l’environnement local n’était pas un problème et l’obtention d’un identifiant de service pour le composant CI/CD était plus un problème administratif que technique. L’utilisation de simulations pour tester des objets Python sérialisés sur Spark est en fait le problème. Il s’agit d’une fonctionnalité, pas d’un bug. Parlons de comment comment Python et Étincelle travailler ensemble.

PySpark sous le capot

Apache Spark est écrit en Échelle et Java. Les deux langages de programmation fonctionnent dans le Machine virtuelle Java (JVM). Python s’exécute dans un interprètepas une JVM. PySpark permet aux développeurs d’utiliser Python pour exécuter des tâches Spark en tirant parti Py4J.

Py4J permet aux programmes Python exécutés dans un interpréteur Python d’accéder dynamiquement aux objets Java dans une machine virtuelle Java. Les méthodes sont appelées comme si les objets Java résidaient dans l’interpréteur Python et que les collections Java étaient accessibles via les méthodes de collection Python standard. Py4J permet également aux programmes Java de rappeler des objets Python.

PySpark peut utiliser Spark comme moteur pour soumettre et calculer des tâches. Une JVM est lancée lorsque vous créez et initialisez une session Spark avec Pyspark (pyspark.SparkContext ou pyspark.sql.SparkSession). PySpark utilise Py4J côté pilote pour communiquer avec cette instance JVM. Les travailleurs Python sont lancés paresseusement lorsque les fonctions natives Python doivent être mappées à leurs homologues Java. Cela fonctionne pour tous les objets Python standard. Cela fonctionne si bien que la plupart des développeurs n’ont même pas besoin de réfléchir au comment ou au pourquoi cela fonctionne. Moi non plus jusqu’à mon aventure avec TDD dans un environnement local.

POO et PySpark

Dans mon article précédent, j’ai expliqué que je devais gérer la journalisation et l’observabilité dans Databricks plutôt que dans Azure et que Databricks ne dispose pas de fonctionnalités natives pour cela. Aucun problème. Tout d’abord, j’ai créé des exceptions personnalisées afin que le client puisse facilement identifier la responsabilité en cas de problème (le propriétaire de l’API, l’équipe Databricks ou nous). Voici littéralement tout le code :

class APIError(Exception):
    """Exception raised for errors in the API call."""
    
class ValidationError(Exception):
    """Exception raised for validation errors."""

class DataError(Exception):
    """Exception raised for errors in data processing."""
Intelligence des données - L'avenir du Big Data
L’avenir du Big Data

Avec quelques conseils, vous pouvez créer une plateforme de données adaptée aux besoins de votre organisation et tirer le meilleur parti de votre capital de données.

Obtenez le guide

Ensuite, il y avait une classe Logging qui pouvait prendre une chaîne ou une exception, puis créer une entrée de journal au niveau des informations ou des erreurs et stocker un enregistrement dans une table delta :

def log_message(message: Optional[str] = None, exception: Optional[Exception] = None) -> None:

        ... some simple code to handle either a string or an exception ...

        spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

        df = spark.createDataFrame(
            [(datetime.now(), logger.name, log_level.upper(), log_content)],
            schema="timestamp timestamp, calling_method string, log_level string, message string"
        )


        df.write.format("delta").mode("append").saveAsTable("default.logging")

La journalisation des erreurs provenant d’autres scripts Python était assez simple :

from ..utils.custom_exceptions import ValidationError, APIError
from ..utils.custom_logging import log_message

def some_validation_method(start_date, end_date)
    try {
        if start_date >= end_date:
            raise ValidationError("Start date must be before end date")
    except ValidationError as e:
        log_message(exception=e)

J’ai créé des tests unitaires à l’aide de simulations. Cela a pris beaucoup de temps car se moquer de Spark n’est pas facile, mais finalement tous les tests ont réussi et j’ai eu une couverture complète du code. Honnêtement, je me sentais plutôt bien avec le code. C’était assez simple et direct. Je suis allé dans l’interface utilisateur de Databricks pour exécuter le bloc-notes que j’ai créé pour la surveillance afin d’ajouter des enregistrements factices à la table de journalisation et de vérifier l’UX.

Problèmes de cornichons

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 38.0 failed 4 times, most recent failure: Lost task 3.3 in stage 38.0 (TID 55) (10.139.64.6 executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for main.utils.custom_exceptions.APIError). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class. at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199) at net.razorvine.pickle.Unpickler.load(Unpickler.java:109) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122) at org.apache.spark.api.python.SerDeUtil$.$anonfun$pythonToJava$2(SerDeUtil.scala:126)

En fait, il m’a fallu une seconde pour réaliser que je regardais une trace de pile et non une trame de données.

Mes connaissances techniques du décapage Python manquait de profondeurCependant, j’ai passé beaucoup de temps dans Hadoop et j’ai une assez bonne maîtrise des erreurs SerDe. Divulgation complète, je crée habituellement des objets dans Scala, donc je n’avais pas vu cette erreur auparavant dans Databricks. Mais je suis un développeur Java de longue date et je sais lire un stacktrace. Première étape, allez vérifier le code si vous le pouvez. Toi peut. En regardant le code, je peux voir qu’il traite toutes les classes non prises en charge/non enregistrées comme un dict. C’est pourquoi j’ai montré le code de ma classe ; ce n’est pas un dicton. S’il existe des classes non prises en charge/non enregistrées, il doit y avoir des classes prises en charge/enregistrées. J’ai vérifié le LISEZMOI et j’ai trouvé la liste. Et évidemment, aucune classe personnalisée ne figurerait jamais sur la liste. Je n’enregistre certainement pas un IObjectConstructor personnalisé. Et c’est ainsi que j’ai commencé le voyage qui a abouti à une lecture sur Py4J.

Conclusion

Qu’ai-je appris ? Pour faire court, transmettez simplement des dataframes à des objets simples comme des chaînes ou des entiers dans PySpark. Dans mon cas, par exemple, transmettre la représentation sous forme de chaîne de l’objet était trivial puisque l’objet n’était qu’une chaîne. Ce n’est pas un gros problème. Cependant, j’ai dû accepter le fait que j’étais à l’aise avec un code qui ne fonctionnait pas. Les tests unitaires étaient plus compliqués que mon code, mais c’est en fait assez courant. De plus, je l’aurais attrapé lors d’un test d’intégration. Dans un autre blog, je montrerai comment j’ai géré les moqueries de Spock. Probablement. Le fait est que j’ai eu un résultat très différent lors de l’exécution d’un morceau de code à partir de l’interface utilisateur Databricks et à partir de mon instance locale. J’en ai suffisamment appris pour ne plus refaire la même erreur. J’ai refactorisé mon code pour qu’il soit encore plus résilient maintenant. J’ai de meilleures techniques de gestion des exceptions. Et je continuerai à utiliser VS Code comme principal outil de codage pour Databricks. Mais j’aurai toujours un notebook Databricks ouvert dans un navigateur car on ne sait jamais.






Source link