Fermer

janvier 16, 2024

Interrogation et transformation des données Snowflake et des fichiers S3 à l’aide d’AWS Glue Python Shell / Blogs / Perficient

Interrogation et transformation des données Snowflake et des fichiers S3 à l’aide d’AWS Glue Python Shell / Blogs / Perficient


Introduction

AWS Glue est un service ETL entièrement géré d’AWS qui offre la flexibilité de travailler à la fois avec les tables Snowflake et les fichiers S3. Le shell AWS Glue Python permet d’utiliser des bibliothèques python supplémentaires (dans notre cas, snowflake-connector-python) qui aident à récupérer les données de snowflake et à exécuter les requêtes directement dans Snowflake avec l’entrepôt virtuel Snowflake. Ainsi, grâce à la prise en charge de Python, à la nature sans serveur et à l’utilisation des ressources Snowflake, ETL a été rendu plus simple et plus puissant avec AWS Glue et Snowflake.

Conditions préalables

  • Créez un rôle IAM avec une stratégie IAM attachée avec les autorisations nécessaires pour accéder à Snowflake.
  • Créez une intégration de stockage dans Snowflake qui stocke les détails IAM et accordez à l’utilisateur IAM les autorisations pour accéder aux objets du compartiment.
  • Créez une étape externe Snowflake qui fait référence à l’intégration de stockage qui a été créée.

https://docs.snowflake.com/en/user-guide/data-load-s3-config

  • Créez les objets nécessaires dans Snowflake.
create database Loans;
create schema Fixed_loans;
Create or replace table EMI_Calculator(
Loan_ID number,
Loan_type string,
Tenure number,
Tenure_in_months number as (Tenure * 12),
Loan_Amount number,
Interest_rate number,
EMI number,
Total_payment number as (EMI * Tenure_in_months),
Total_interest_payable number as (Total_payment-Loan_Amount));

Illustrons par un exemple,

  • Considérons un simple flux ETL comme ci-dessous.
  • L’application client place le fichier source brut avec les détails du prêt dans le compartiment S3.
  • EMI est calculé à l’aide de scripts Python et les données sont chargées dans une table Snowflake qui contient des expressions de colonne pour des calculs simples.
  • Ensuite, les données sont lues à partir du tableau en flocon de neige et exportées dans des fichiers distincts pour différents types de prêts.
  • Considérons le fichier csv ci-dessous comme source pour notre exemple.

Image 1

Créer un shell Python dans AWS Glue

  • Dans la console AWS Glue, cliquez sur les tâches ETL et sous Créer une tâche, sélectionnez l’éditeur de script et créez une nouvelle tâche Glue.

Image2

  • Une fois le travail Glue créé, sous Détails du travail -> Ajoutez la clé et la valeur ci-dessous dans les paramètres du travail sous Propriétés avancées.

Image3

  • Importez ensuite les bibliothèques requises, snowflake.connector (pour interroger les tables Snowflake) et boto3 (pour gérer les fichiers S3).
import sys
import snowflake.connector
import boto3
import io
import pandas as pd
  • Créez une connexion Snowflake, un curseur Snowflake et un client S3. (Pour des raisons de sécurité, les utilisateurs peuvent stocker les informations d’identification dans AWS Secrets Manager au lieu d’utiliser directement les informations d’identification dans le code)
#Create snowflake connection
conn = snowflake.connector.connect(
    user=<user_name>,
    password=<user_password>,
    account=<account_identifier>,
    warehouse="wh_1",
    database="Loans",
    schema="Fixed_loans",
    role="sysadmin’)

#Create snowflake cursor    
cur = conn.cursor()

#Create s3 client
s3_client = boto3.client("s3')

Cas 1 : importation d’un fichier s3 dans Snowflake

  • Obtenez les données du fichier source brut à l’aide de s3_client et lisez les données sous forme de trame de données pandas.
#read the source file
obj = s3_client.get_object(Bucket = Bucket_name,Key = 'data_in/Loan_data.csv')
df = pd.read_csv(io.BytesIO(obj['Body'].read()),header=0)
  • En utilisant pandas et d’autres bibliothèques Python, les transformations de données nécessaires peuvent être effectuées. Ici, l’IME est calculé sur la base des détails du prêt.
df['EMI'] = df['Amount'] * (df['Interest_rate']/1200) * pow((1+(df['Interest_rate']/1200)),(df['Tenure']*12)) /( pow((1+(df['Interest_rate']/1200)),(df['Tenure']*12)) -1 )
  • Placez la trame de données traitée dans le compartiment s3 au format de fichier recommandé.
csv_buffer = io.StringIO()
df.to_csv(csv_buffer,index=False)
s3_client.put_object(Body=csv_buffer.getvalue(), Bucket = Bucket_name, Key = output_key)
  • Dans notre exemple, il y a quelques colonnes virtuelles supplémentaires dans la table cible de snowflake. Pour charger les données dans les colonnes correctes, le fichier traité est interrogé à l’aide de SÉLECTIONNER commande et chargé dans la table finale en utilisant COPIER DANS commande comme ci-dessous,
COPY INTO TABLE_NAME(COLUMNS) FROM (SELECT $1,$2,.. FROM @STAGE/FILE) FILE_FORMAT = ‘CSV_FF’
  • Les codes ci-dessous généreront un code SQL au format ci-dessus.
#Assigning variables
Table_name="Loans.Fixed_loans.EMI_Calculator"
File_format="Loans.Fixed_loans.csv_ff"
stage="Loans.Fixed_loans.my_s3_stage"

#Column names were taken from Snowflake
cur.execute(f'DESCRIBE TABLE {Table_name}')
result = cur.fetchall()
result_df = pd.DataFrame(result,columns=[column[0] for column in cur.description])
new_df = result_df[result_df['kind']=='COLUMN']
column_list = list(new_df['name'])
col=",".join(column_list)

#generating the sequence like $1,$2,$3...
select_query_list =[]
for i in range(1,len(column_list)+1):
    select_query_list.append('$'+str(i))
select_query = ','.join(select_query_list)

#executing copy command
cur.execute(f"COPY INTO {Table_name}({col}) FROM (SELECT {select_query} FROM @{stage}/{output_key}) FILE_FORMAT = '{File_format}'")
  • Une fois le travail Glue exécuté, nous avons pu voir que les données sont chargées dans la table Snowflake comme prévu.

Image6

Cas 2 : exporter les données de la table Snowflake vers le fichier s3

  • Considérons un scénario pour exporter des données dans des fichiers séparés pour différents types de prêts.
  • La requête peut être exécutée avec le curseur et en utilisant les fonctions fetchall(),fetchmany() et fetchone(), les données requises peuvent être exportées vers le compartiment S3 à partir de snowflake comme ci-dessous.
for loan_type in ['Housing','Vehicle','Personal']:
    export_query = f"select * from Loans.Fixed_loans.EMI_Calculator where loan_type="{loan_type}""
    print(export_query)
    cur.execute(export_query)
    result = cur.fetchall()
    df = pd.DataFrame(result,columns=[column[0] for column in cur.description])
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer,index=False)
    output_key = f'data_out/result/{loan_type}.csv'
    s3_client.put_object(Body=csv_buffer.getvalue(), Bucket = Bucket_name, Key = output_key)
  • Une autre façon d’exporter les fichiers consistera à utiliser les fonctionnalités de déchargement de flocons de neige, COPIER DANS avec requis FORMAT DE FICHIER.
  • Snowflake prend en charge le déchargement de données dans trois formats. c’est-à-dire délimité, JSON ou Parquet.
  • La capture d’écran ci-dessous montre le fichier exporté pour la requête ci-dessus.

Image4

Recommandations

  • Créez un flux de travail avec des déclencheurs pour exécuter les tâches dans n’importe quel ordre, parallèle ou séquentiel. Le déclencheur peut être créé à la demande ou planifié à l’aide d’expressions CRON en fonction des exigences.
  • Augmentez les unités de traitement de données à 1DPU pour gérer de grands ensembles de données.

Conclusion

Le client peut envisager d’utiliser cette méthode s’il utilise le compartiment S3 pour le stockage des données. Car cela offrira plus de flexibilité pour utiliser les fonctions boto3 avec d’autres bibliothèques Python et fonctionnalités Snowflake dans le même flux ETL. L’utilisation d’AWS Glue pour ETL nous offre également la possibilité d’utiliser d’autres services AWS tels que SNS, KMS et plus encore pour des applications étendues et des options de sécurité supplémentaires.






Source link