Fermer

février 15, 2021

Gestion des erreurs de consommateur Kafka, réessayer et récupération


Ce billet de blog concerne la résilience des consommateurs de Kafka lorsque nous travaillons avec Apache Kafka et Spring Boot.

En tant que scénario, supposons qu'un consommateur Kafka interroge les événements à partir d'une rubrique PackageEvents.

classe de service (Package service) est responsable du stockage des événements consommés dans une base de données.

Remarque: Ici à la place de la base de données, il peut s'agir d'un appel à une API ou à une application tierce.

 Image pour le message

Je wo u ld aiment expliquer comment gérer les exceptions au niveau du service, où une exception peut être en service comme validation ou tout en persistant dans une base de données ou cela peut également être le cas lorsque vous appelez une API.

Consommateur Kafka:

Pour créer un consommateur écoutant un certain sujet, nous utilisons @KafkaListener (topics = {"packages -received ”}) sur une méthode de l'application Spring Boot.

Ici," packages-reçus "est le sujet à partir duquel interroger les messages.

 @KafkaListener (topics = {" packages-received "})
public void packagesListener (ConsumerRecord  packageInfoEvent) {

    log.info ("Evénement reçu pour conserver packageInfoEvent: {}", packageInfoEvent.value ());

} 

En général, Kafka Listener obtient toutes les propriétés telles que groupId, key et value serializer information spécifiées dans les fichiers de propriétés par le bean «kafkaListenerFactory».

En termes simples, le bean «kafkaListenerFactory» est la clé de la configuration de Kafka Listener.

Si nous devons configurer la configuration de l'écouteur Kafka en écrasant le comportement par défaut, vous devez créer votre bean «kafkaListenerFactory» et définir les configurations souhaitées.

 Covid 19

C'est ce que nous allons utiliser pour configurer la gestion des erreurs, réessayer et récupérer pour l'écouteur / consommateur Kafka.

 @Bean
ConcurrentKafkaListenerContainerFactory  kafkaListenerContainerFactory (
        Configurer ConcurrentKafkaListenerContainerFactoryConfigurer,
        ObjectProvider <ConsumerFactory > kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory  factory = new ConcurrentKafkaListenerContainerFactory <> ();
    configurer.configure (usine, kafkaConsumerFactory);
    
    retour usine;
} 

Kafka Retry:

En général, les exceptions d'exécution provoquées dans la couche de service, ce sont les exceptions causées par le service (DB, API) auquel vous essayez d'accéder est en panne ou a un problème.

Ces Les exceptions sont celles qui peuvent réussir lorsqu'elles sont essayées plus tard.

L'extrait de code suivant montre comment configurer une nouvelle tentative avec RetryTemplate.

En retour, RetryTemplate est défini avec la stratégie Retry qui spécifie le nombre maximal de tentatives que vous souhaitez réessayer et quelles sont les exceptions que vous souhaitez réessayer et celles qui ne doivent pas être retentées.

 public class ConsumerConfig {

    @Haricot
    ConcurrentKafkaListenerContainerFactory  kafkaListenerContainerFactory (
            Configurer ConcurrentKafkaListenerContainerFactoryConfigurer,
            ObjectProvider <ConsumerFactory > kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory  factory = new ConcurrentKafkaListenerContainerFactory <> ();
        configurer.configure (usine, kafkaConsumerFactory);

        factory.setRetryTemplate (retryTemplate ());

        retour usine;
    }

    private RetryTemplate retryTemplate () {

        RetryTemplate retryTemplate = new RetryTemplate ();

      / * ici, la politique de réessai est utilisée pour définir le nombre de tentatives de réessai et les exceptions que vous vouliez essayer et ce que vous ne voulez pas réessayer. * /
         retryTemplate.setRetryPolicy (getSimpleRetryPolicy ());

        return retryTemplate;
    }

    private SimpleRetryPolicy getSimpleRetryPolicy () {
        Map <Class Boolean> exceptionMap = new HashMap <> ();
     
        // la valeur booléenne dans la carte détermine si l'exception doit être retentée
        exceptionMap.put (IllegalArgumentException.class, false);
        exceptionMap.put (TimeoutException.class, true);

        retourne le nouveau SimpleRetryPolicy (3, exceptionMap, true);
    }

} 

lorsque l'événement échoue, même après avoir réessayé certaines exceptions pour le nombre maximum de tentatives, la phase de récupération démarre.

La relance et la récupération vont de pair,

si le nombre de tentatives est épuisé , La récupération testera si l'exception d'événement est récupérable et prendra les mesures de récupération nécessaires, comme la remettre pour réessayer le sujet ou l'enregistrer dans la base de données pour essayer pour plus tard.

Si l'exception d'événement n'est pas récupérable, elle la transmet simplement à le gestionnaire d'erreur. Nous parlerons de la gestion des erreurs dans une minute ici.

Kafka Recovery:

Il existe une méthode manuelle setRecoveryCallBack () sur ConcurrentKafkaListenerContainerFactory où elle accepte le paramètre de contexte Retry,

ici nous obtenons le contexte (après max tentatives), il contient des informations sur l'événement.

Implémentation de la récupération:

 @Configuration
@ Slf4j
public class ConsumerConfig {

    @Haricot
    ConcurrentKafkaListenerContainerFactory  kafkaListenerContainerFactory (
            Configurer ConcurrentKafkaListenerContainerFactoryConfigurer,
            ObjectProvider <ConsumerFactory > kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory  factory = new ConcurrentKafkaListenerContainerFactory <> ();
        configurer.configure (usine, kafkaConsumerFactory);

        factory.setRetryTemplate (retryTemplate ());

        factory.setRecoveryCallback ((contexte -> {

            if (context.getLastThrowable (). getCause () instanceof RecoverableDataAccessException) {

                // ici vous pouvez faire votre mécanisme de récupération où vous pouvez revenir sur le sujet en utilisant un producteur Kafka

            } autre{

                // ici, vous pouvez enregistrer des choses et lancer une exception personnalisée dont le gestionnaire d'erreur prendra en charge.
                throw new RuntimeException (context.getLastThrowable (). getMessage ());
            }
            
            return null;

        }));

        retour usine;
    }

    private RetryTemplate retryTemplate () {

        RetryTemplate retryTemplate = new RetryTemplate ();

      / * ici, la stratégie de nouvelle tentative est utilisée pour définir le nombre de tentatives
        réessayer et quelles exceptions vous vouliez essayer et
         ce que vous ne voulez pas réessayer. * /

        retryTemplate.setRetryPolicy (getSimpleRetryPolicy ());

        return retryTemplate;
    }

    private SimpleRetryPolicy getSimpleRetryPolicy () {
        Map <Class Boolean> exceptionMap = new HashMap <> ();
        exceptionMap.put (IllegalArgumentException.class, false);
        exceptionMap.put (TimeoutException.class, true);

        retourne le nouveau SimpleRetryPolicy (3, exceptionMap, true);
    }

} 

Traitement des erreurs Kafka:

Pour toute exception dans le processus de l'événement consommé, une erreur est consignée par Kafka «LoggingErrorHandler.class» dans le package org.springframework.kafka.listener, [19659012] LoggingErrorHandler implémente l'interface «ErrorHandler».

nous pouvons implémenter notre propre gestionnaire d'erreurs en implémentant l'interface «ErrorHandler».

 @Configuration
@ Slf4j
public class ConsumerConfig {
    @Haricot
    ConcurrentKafkaListenerContainerFactory  kafkaListenerContainerFactory (
            Configurer ConcurrentKafkaListenerContainerFactoryConfigurer,
            ObjectProvider <ConsumerFactory > kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory  factory = new ConcurrentKafkaListenerContainerFactory <> ();
        configurer.configure (usine, kafkaConsumerFactory);
        factory.setErrorHandler (((exception, données) -> {

         / * ici, vous pouvez effectuer une gestion personnalisée, je suis juste en train de le connecter comme le fait le gestionnaire d'erreur par défaut
        Si vous souhaitez simplement vous connecter. vous n'avez pas besoin de configurer le gestionnaire d'erreurs ici. Le gestionnaire par défaut le fait pour vous.
         En général, vous conserverez les enregistrements ayant échoué dans la base de données pour suivre les enregistrements ayant échoué. * /

         log.error ("Erreur en cours de traitement avec l'exception {} et l'enregistrement est {}", exception, données);
        }));
        retour usine;
    }
} 

Extrait de code toutes les stratégies fonctionnant ensemble

 package com.pack.events.consumer.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.RecoverableDataAccessException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Configuration
@ Slf4j
public class ConsumerConfig {

    @Haricot
    ConcurrentKafkaListenerContainerFactory  kafkaListenerContainerFactory (
            Configurer ConcurrentKafkaListenerContainerFactoryConfigurer,
            ObjectProvider <ConsumerFactory > kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory  factory = new ConcurrentKafkaListenerContainerFactory <> ();
        configurer.configure (usine, kafkaConsumerFactory);

        factory.setRetryTemplate (retryTemplate ());

        factory.setRecoveryCallback ((contexte -> {

            if (context.getLastThrowable (). getCause () instanceof RecoverableDataAccessException) {

                // ici vous pouvez faire votre mécanisme de récupération où vous pouvez revenir sur le sujet en utilisant un producteur Kafka

            } autre{

                // ici, vous pouvez enregistrer des choses et lancer une exception personnalisée dont le gestionnaire d'erreur se chargera.
                throw new RuntimeException (context.getLastThrowable (). getMessage ());
            }

            return null;

        }));

        factory.setErrorHandler (((exception, données) -> {

           / * ici, vous pouvez effectuer une gestion personnalisée, je suis juste en train de le connecter comme le fait le gestionnaire d'erreur par défaut
          Si vous souhaitez simplement vous connecter. vous n'avez pas besoin de configurer le gestionnaire d'erreurs ici. Le gestionnaire par défaut le fait pour vous.
          En règle générale, vous conserverez les enregistrements ayant échoué dans la base de données pour suivre les enregistrements ayant échoué. * /

            log.error ("Erreur en cours de traitement avec l'exception {} et l'enregistrement est {}", exception, données);
        }));

        retour usine;
    }

    private RetryTemplate retryTemplate () {

        RetryTemplate retryTemplate = new RetryTemplate ();

       / * ici, la politique de réessai est utilisée pour définir le nombre de tentatives de réessai et les exceptions que vous vouliez essayer et ce que vous ne voulez pas réessayer. * /

        retryTemplate.setRetryPolicy (getSimpleRetryPolicy ());

        return retryTemplate;
    }

    private SimpleRetryPolicy getSimpleRetryPolicy () {
        Map <Class Boolean> exceptionMap = new HashMap <> ();
        exceptionMap.put (IllegalArgumentException.class, false);
        exceptionMap.put (TimeoutException.class, true);

        retourne le nouveau SimpleRetryPolicy (3, exceptionMap, true);
    }

} 

Merci.






Source link