Fermer

décembre 26, 2022

Fusion dynamique dans Snowflake à l’aide d’une procédure stockée et de Python – Pour SCD de type 1

Fusion dynamique dans Snowflake à l’aide d’une procédure stockée et de Python – Pour SCD de type 1


Créer un compte Snowflake

Nous pouvons créer un compte d’essai gratuit de 30 jours en utilisant le lien ci-dessous.

https://signup.snowflake.com/

DSC Type 1

En bref, conserve uniquement les dernières données les plus récentes et les anciennes données sont écrasées.

SCD Type 1 en flocon de neige

Considérons que nous avons un scénario pour charger les données dans la table cible à partir de la table source.

Nous pouvons utiliser une instruction MERGE normale comme ci-dessous pour soit METTRE À JOUR un enregistrement (s’il existe déjà basé sur la colonne clé) INSÉRER cet enregistrement.

Fusionner

Fusion dynamique dans Snowflake

Lorsque nous devons traiter plusieurs instructions MERGE, au lieu d’écrire MERGE plusieurs fois, nous pouvons tirer parti de la procédure stockée. Parallèlement à cela, utilisons une table intermédiaire (nous pouvons l’appeler ‘Tableau de la carte‘) qui contient le mappage au niveau du champ dans les tables source/cible.

Comment effectuer une fusion dynamique ? (SCD 1)

Définissez vos tables source – EMP_STAGE et cible – EMPLOYEE comme ci-dessous.

La table EMP_STAGE ressemble à ci-dessous.

Tableau EMP_STAGE

Le tableau des employés ressemble à ci-dessous.

6 Avant

Vous trouverez ci-dessous à quoi ressemblera la table des employés une fois la fusion terminée.

EMP_ID 1 est mis à jour tandis que EMP_ID 9 est inséré ici à partir de EMP_STAGE.

3 Résultats attendus

Tableau CARTE

Définissez votre mappage au niveau du champ entre la source et la cible dans le tableau des cartes comme ci-dessous.

0 Tableau des cartes

KEY_FIELD – Y désigne le champ clé qui peut être utilisé pour joindre/comparer entre 2 tables tandis que KEY_FIELD – N signifie qu’il s’agit d’un champ non clé.

Notre scénario: La table source est EMP_STAGE tandis que la table cible est EMPLOYEE avec l’ID de colonne 1 – ID d’employé comme seul champ clé. Donc, avez marqué ‘Y’ dans KEY_FIELD.

Création d’une procédure stockée

J’ai utilisé Python comme langage de script et utilisé le cadre de données ‘Pandas’. Nous devons déclarer la version, les packages et le nom de la fonction ‘Cours’ dans le gestionnaire. Le gestionnaire marque comme point d’entrée pour notre programme qui est défini après cela en utilisant ‘def run’.

Je viens d’utiliser 2 paramètres ici pour la procédure stockée – un pour la source et un pour le nom de la table cible.

CREATE OR REPLACE PROCEDURE AUTO_MERGE(S_TABLE STRING, T_TABLE STRING) 
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','pandas')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.functions import col, lit
from snowflake.snowpark.types import StringType
import pandas as pd

def run(session, s_tab, t_tab):

REMARQUE: Pour utiliser Pandas, vous devez d’abord l’activer en suivant les étapes mentionnées dans documentation sur les flocons de neige sinon, vous obtenez un message d’erreur pour activer le package Anaconda. S’il vous plaît laissez-moi savoir si vous avez encore besoin d’aide ici.

Stocké Logique de procédure

Étape 1 est d’obtenir la condition « ON » dans l’instruction MERGE qui consiste à interroger le KEY_FIELD – Y dans la table MAP en fonction des noms de table source/cible des paramètres. Stockez ensuite le jeu de résultats dans Snowflake Dataframe – df (qui ressemblera à une table normale).

def run(session, s_tab, t_tab):
    df = session.sql(f'''
    SELECT '{s_tab}'||'.'||S_FIELD||'='||'{t_tab}'||'.'||T_FIELD as FILTERS FROM TEST.PYTHON.MAP_TABLE WHERE S_TABLE='{s_tab}' AND T_TABLE='{t_tab}' AND KEY_FIELD='Y'
    ''')
    cc = df.count();
    var_filter="";
    df1 = df.to_pandas()
    df1['colB'] = " AND ";
    df1.iloc[cc-1, 1] = " "
    df1['FILTER'] = df1['FILTERS']+df1['colB']
    df2 = df1['FILTER']
    arr = df2.to_numpy()
    arr_key_fields="".join(arr);

Deuxièmement, je reçois le nombre de données et je le convertis en Cadre de données Pandas (df1) pour tirer parti des avantages de Pandas.

J’ai créé une nouvelle colonne ‘colB’ dans le dataframe pour stocker la valeur ‘AND’. La ligne suivante-iloc est utilisé pour ajouter un blanc ‘ ‘ dans colB pour le dernier enregistrement. Ensuite, j’ai concaténé les noms de champs avec ‘ AND ‘ (le dernier enregistrement n’aura pas ‘AND’) et je les ai convertis en une série, puis en un tableau numpy suivi d’un espace. De cette façon, vous pouvez créer toutes les chaînes qui étaient présentes dans plusieurs lignes en une seule chaîne pour créer le À condition pour l’instruction MERGE.

Remarque : ‘ AND ‘ est utilisé comme séparateur pour plusieurs conditions sauf la dernière

Exemple: Considérez ci-dessus 4 lignes et il peut être converti en une seule chaîne comme « Hey comment allez-vous? » en utilisant l’ensemble de code ci-dessus.

Étape 2 est de construire le mettre à jour les champs nécessaire pour MERGE. Presque le même processus mais cette fois je dois filtrer la requête basée sur KEY_FIELD ‘N’ pour mettre à jour les champs non clés dans MERGE.

df_update_fields = session.sql(f'''
SELECT '{t_tab}'||'.'||T_FIELD||'='||'{s_tab}'||'.'||S_FIELD as FILTERS FROM TEST.PYTHON.MAP_TABLE E WHERE S_TABLE='{s_tab}' AND T_TABLE='{t_tab}' AND KEY_FIELD='N'
''')
c = df_update_fields.count();
var_filter="";
df1_update_fields = df_update_fields.to_pandas()
df1_update_fields['colB'] = ", ";
df1_update_fields.iloc[c-1, 1] = " "
df1_update_fields['FILTER'] = df1_update_fields['FILTERS']+df1_update_fields['colB']
df2_update_fields = df1_update_fields['FILTER']
arr_update_fields = df2_update_fields.to_numpy()
arr_update_fields1 = ''.join(arr_update_fields);

Une autre différence est que j’ai utilisé une virgule comme séparateur entre différents champs. Donc, j’ai utilisé la colonne ici colB avec une virgule plutôt que ET comparant l’instantané précédent pour la condition ON.

Étape 3 est de construire le insérer des champs nécessaire pour MERGE. C’est presque le même processus que ci-dessus mais ici j’essaie d’obtenir 2 colonnes de la MAP_TABLE sans filtres.

df_temp = session.sql(f'''
SELECT T_FIELD, '{s_tab}'||'.'||S_FIELD AS SOURCE FROM TEST.PYTHON.MAP_TABLE WHERE S_TABLE='{s_tab}' AND T_TABLE='{t_tab}' ORDER BY COLUMN_ID
''')
total=df_temp.count()
df_insert=df_temp.to_pandas()

df_insert['colB'] = ", ";
df_insert.iloc[total-1, 2] = " "
df_insert['SRC'] = df_insert['SOURCE']+df_insert['colB']
df_insert['TGT'] = df_insert['T_FIELD']+df_insert['colB']
df_tgt = df_insert['TGT']
df_src = df_insert['SRC']
arr_tgt_col = df_tgt.to_numpy()
arr_tgt="".join(arr_tgt_col);
arr_src_col = df_src.to_numpy()
arr_src="".join(arr_src_col);

Étape 4 est de construire les chaînes ON, mettre à jour et insérer que nous avons construites jusqu’à présent dans une chaîne d’instruction MERGE appropriée comme ci-dessous dans la chaîne mrg_sql. Ensuite, exécutez-le en affichant enfin le merge sql avec un message de réussite

    mrg_sql = "MERGE INTO "
    mrg_sql = mrg_sql + f"{t_tab}" + " USING " + f"{s_tab}" 
    mrg_sql = mrg_sql + " ON " + f"{arr_key_fields}" 
    mrg_sql = mrg_sql + "WHEN MATCHED THEN UPDATE SET "
    mrg_sql = mrg_sql + f"{arr_update_fields1}"
    mrg_sql = mrg_sql + "WHEN NOT MATCHED THEN INSERT ("
    mrg_sql = mrg_sql + f"{arr_tgt}" + ") VALUES ("
    mrg_sql = mrg_sql + f"{arr_src}" + ")"
    
    df = session.sql(f'''
    {mrg_sql}
    ''')
    df.collect()
    return f"{mrg_sql} is the MERGE QUERY and ran successfully"
$$;

Étape 5 consiste à appeler la procédure stockée avec les tables source et cible comme paramètres.

APPELER AUTO_MERGE(‘EMP_STAGE’,’EMPLOYEE’);

Production

MERGE INTO EMPLOYEE USING EMP_STAGE ON EMP_STAGE.E_ID=EMPLOYEE.EMP_ID WHEN MATCHED THEN UPDATE SET EMPLOYEE.EMP_NAME=EMP_STAGE.E_NAME, EMPLOYEE.LOCATION_ID=EMP_STAGE.LOCATION, EMPLOYEE.DEP_ID=EMP_STAGE.DEP_ID, EMPLOYEE.SALARY=EMP_STAGE.SALARY, EMPLOYEE.COMMENTS=EMP_STAGE.COMMENTS, EMPLOYEE.GENDER=EMP_STAGE.GENDER WHEN NOT MATCHED THEN INSERT (EMP_ID, EMP_NAME, LOCATION_ID, DEP_ID, SALARY, COMMENTS, GENDER ) VALUES (EMP_STAGE.E_ID, EMP_STAGE.E_NAME, EMP_STAGE.LOCATION, EMP_STAGE.DEP_ID, EMP_STAGE.SALARY, EMP_STAGE.COMMENTS, EMP_STAGE.GENDER ) is the MERGE QUERY and ran successfully

Avant l’exécution – Instantané du tableau

6 Avant

Après l’exécution – Instantané du tableau

6 Après

Ici, 1 enregistrement (EMP_ID : 9) est inséré et un autre enregistrement (EMP_ID : 1) est mis à jour.

J’espère que vous avez eu un aperçu de la façon dont nous pouvons mélanger la procédure stockée avec Python et Dataframe dans Snowflake pour fusionner automatiquement les tables dont nous avons besoin à notre aise.

Reportez-vous à la documentation officielle de Snowflake ici pour apprendre plus.

Veuillez partager vos réflexions et suggestions dans l’espace ci-dessous et je ferai de mon mieux pour y répondre dans la mesure du temps disponible.

Bon apprentissage!!






Source link

décembre 26, 2022