Skip to content

Exercice Lakehouse

Contexte

Vous êtes data engineer dans une entreprise de vente en ligne. Vous disposez de fichiers CSV contenant les commandes clients des trois dernières années (2022, 2023, 2024), déposés dans le dossier Files/commandes/ de votre lakehouse Fabric. Votre mission : charger ces données, les nettoyer, les transformer et les stocker sous forme de table Delta interrogeable en SQL.

Prérequis

Un espace de travail Microsoft Fabric avec une capacité active (essai ou Premium).
Un lakehouse créé dans cet espace de travail.
Un notebook Fabric attaché au lakehouse.

Données d’exemple

Chaque fichier CSV (2022.csv, 2023.csv, 2024.csv) contient les colonnes suivantes :
NumeroCommande,LigneCommande,DateCommande,NomClient,Email,Produit,Quantite,PrixUnitaire,Taxe
SO001,1,2022-01-15,Marie Dupont,marie@example.com,Clavier,2,45.99,9.20
SO001,2,2022-01-15,Marie Dupont,marie@example.com,Souris,1,25.50,5.10
SO002,1,2022-01-18,Jean Martin,jean@example.com,Ecran,1,299.00,59.80
ExoLakehouse.zip
9.8 KB

Étape 1 : charger un fichier CSV dans un dataframe

La commande de base pour lire un fichier CSV dans un dataframe Spark :
# Charger un seul fichier CSV
df = spark.read.format("csv").option("header", True).load("Files/commandes/2022.csv")

# Afficher les premières lignes
display(df)
Le paramètre header=True indique que la première ligne du fichier contient les noms de colonnes.
A faire : exécutez ce code dans une cellule de votre notebook. Observez les types de données détectés : par défaut, toutes les colonnes sont de type string.

Etape 2 : charger plusieurs fichiers CSV avec un schéma explicite

Pour charger tous les fichiers du dossier et forcer les types de données corrects :
from pyspark.sql.types import *

# Définir le schéma explicitement
schema_commandes = StructType([
StructField("NumeroCommande", StringType()),
StructField("LigneCommande", IntegerType()),
StructField("DateCommande", DateType()),
StructField("NomClient", StringType()),
StructField("Email", StringType()),
StructField("Produit", StringType()),
StructField("Quantite", IntegerType()),
StructField("PrixUnitaire", FloatType()),
StructField("Taxe", FloatType())
])

# Charger tous les CSV du dossier avec le wildcard *
df = spark.read.format("csv") \
.schema(schema_commandes) \
.option("header", True) \
.load("Files/commandes/*.csv")

# Vérifier le nombre de lignes chargées
print(f"Nombre de lignes : {df.count()}")

# Vérifier le schéma
df.printSchema()

# Afficher un échantillon
display(df.limit(10))
A faire : comparez le résultat de printSchema() avec et sans schéma explicite. Que constatez-vous sur les types de la colonne Quantite ?

Etape 3 : explorer les données

Quelques commandes utiles pour explorer rapidement les données chargées :
# Nombre de lignes et de colonnes
print(f"Lignes : {df.count()}, Colonnes : {len(df.columns)}")

# Statistiques descriptives sur les colonnes numériques
display(df.describe())

# Compter les valeurs distinctes d'une colonne
from pyspark.sql.functions import countDistinct
df.select(countDistinct("NomClient").alias("NbClients")).show()

# Vérifier les valeurs nulles par colonne
from pyspark.sql.functions import col, sum as spark_sum
display(
df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
)
A faire : exécutez chaque bloc. Combien de clients distincts avez-vous ? Y a-t-il des valeurs nulles ?

Etape 4 : nettoyer et transformer les données

4.1. Supprimer les doublons

# Supprimer les lignes en double
df_clean = df.dropDuplicates()
print(f"Avant : {df.count()} lignes, Après : {df_clean.count()} lignes")

4.2. Supprimer les lignes avec des valeurs nulles sur les colonnes clés

# Supprimer les lignes où NumeroCommande ou DateCommande est null
df_clean = df_clean.dropna(subset=["NumeroCommande", "DateCommande"])

4.3. Ajouter des colonnes calculées

from pyspark.sql.functions import col, year, month, quarter, round

# Ajouter le montant total de la ligne
df_clean = df_clean.withColumn(
"MontantLigne",
round((col("PrixUnitaire") * col("Quantite")) + col("Taxe"), 2)
)

# Extraire l'année, le trimestre et le mois de la date de commande
df_clean = df_clean.withColumn("Annee", year(col("DateCommande")))
df_clean = df_clean.withColumn("Trimestre", quarter(col("DateCommande")))
df_clean = df_clean.withColumn("Mois", month(col("DateCommande")))

display(df_clean.limit(5))

4.4. Renommer une colonne

# Renommer une colonne
df_clean = df_clean.withColumnRenamed("PrixUnitaire", "PrixUnit")
Want to print your doc?
This is not the way.
Try clicking the ··· in the right corner or using a keyboard shortcut (
CtrlP
) instead.