LE FLUX DE FLOCON DE NEIGE
FLUX:
Stream est une méthodologie CHANGE DATA CAPTURE dans snowflake, elle enregistre les modifications DML apportées aux tables, y compris (Insérer/Mettre à jour/supprimer). Lorsqu’un flux est créé pour une table, il créera une paire de colonnes masquées pour suivre les métadonnées.
créer ou remplacer le flux s_emp sur la table emp append_only=false ;
J’ai deux tables emp et emp_hist , emp est ma table source et emp_hist sera ma cible.
Je vais maintenant insérer une nouvelle ligne dans ma table source pour capturer les données de mon flux.
Voyons le résultat de notre flux.
De la même manière, je vais supprimer et faire une mise à jour sur ma table source.
J’ai supprimé un enregistrement et j’ai effectué une mise à jour consécutive. Mais ici, dans le flux, nous avons pu voir deux actions supprimées.
- La première action de suppression concernait la ligne que j’ai supprimée et la seconde concerne la ligne que j’ai mise à jour.
- Si la ligne a été supprimée de la source, le flux capturera METADATA$ACTION comme DELETE et METADATA@ISUPDATE comme FALSE.
- Si la ligne a été mise à jour dans la source, le flux capturera à la fois les actions de suppression et d’insertion, de sorte qu’il capturera l’ancienne ligne en tant que suppression et capturera la ligne mise à jour en tant qu’insertion.
Créez une requête de fusion pour stocker les données du flux dans la table finale :
J’utilise la requête de fusion ci-dessous pour capturer l’enregistrement nouvellement inséré et mis à jour (SsCD1) dans ma table finale.
fusionner dans emp_hist t1
en utilisant (select * from s_emp où not(METADATA$ACTION=’DELETE’ and METADATA$ISUPDATE=’TRUE’) ) t2
sur t1.emp_id=t2.emp_id
en cas de correspondance et t2.METADATA$ACTION=’DELETE’ et METADATA$ISUPDATE=’FALSE’ puis supprimez
en cas de correspondance et t2.METADATA$ACTION=’INSERT’ et METADATA$ISUPDATE=’TRUE’
puis mettez à jour l’ensemble t1.emp_name=t2.emp_name, t1.location=t2.location
quand il n’y a pas de correspondance, alors
insérer (emp_id,emp_name,location) valeurs (t2.emp_id,t2.emp_name,t2.location);
Requête pour SCD2 :
COMMENCER;
mettre à jour empl_hist t1
définir t1.emp_name=t2.emp_name , t1.location=t2.location,t1.end_date=current_timestamp :: timestamp_ntz
from (select emp_id,emp_name,location from s_empl où METADATA$ACTION=’DELETE’) t2
où t1.emp_id=t2.emp_id ;
insérer dans empl_hist, sélectionner t2.emp_id, t2.emp_name, t2.location, current_timestamp, NULL
from s_empl t2 où t2.METADATA$ACTION=’INSERT’;
commettre;
Tâches
Les tâches utilisent des fonctions définies par l’utilisateur pour automatiser et planifier les processus métier. Avec une seule tâche, vous pouvez exécuter une fonction simple ou complexe dans votre pipeline de données.
J’ai créé une tâche pour la requête de fusion mentionnée ci-dessus au lieu d’exécuter cette requête à chaque fois que nous pouvons créer manuellement une tâche. J’ai ici ajouté une condition system$stream_has_data(’emp_s’) dans la création de ma tâche. ainsi, si les données sont disponibles dans le flux, la tâche s’exécutera et la chargera dans la table cible, sinon la tâche sera ignorée.
créer une tâche mytask Warehouse=compute_wh
planning=’1 minute’ quand
système$stream_has_data(’emp_s’)
comme fusionner dans emp_hist t1
en utilisant (select * from emp_s Where not(METADATA$ACTION=’DELETE’ and METADATA$ISUPDATE=’TRUE’) ) t2
sur t1.emp_id=t2.emp_id
en cas de correspondance et t2.METADATA$ACTION=’DELETE’ et METADATA$ISUPDATE=’FALSE’ puis supprimez
en cas de correspondance et t2.METADATA$ACTION=’INSERT’ et METADATA$ISUPDATE=’TRUE’
puis mettez à jour l’ensemble t1.emp_name=t2.emp_name, t1.location=t2.location
quand il n’y a pas de correspondance, alors
insérer (emp_id,emp_name,location) valeurs (t2.emp_id,t2.emp_name,t2.location);
Source link