Skip to content
fabric
Support formation Microsoft Fabric
  • Pages
    • Présentation du support
      • Cours DP-600
        • Untitled page
    • Organisation des formations Power BI & Fabric
    • Présentation de Fabric
      • Architecture Médaillon
      • Tableau comparatif : Power BI standalone vs Microsoft Fabric
      • 36 questions à se poser pour démarrer sur Fabric
      • 10 choses à arrêter de faire / commencer à faire sur Fabric
      • 7 erreurs de capacités, espaces de travail et de contrôle d'accès dans Fabric
    • Feuille de route d'adoption de Microsoft Fabric
      • Synthèse
    • Guides de décision
    • Administration de Fabric
      • Licences Fabric
      • Sécurité
      • Rôles dans les espaces de travail
      • Superviser et gérer
      • Paramètres du client (tenant settings) - Portail d’administration Power BI
    • Suivi des évolutions
    • Exercices
      • Ressources pédagogiques
      • Exercice GreenCycle (Dataflow Gen2)
      • icon picker
        Exercice Lakehouse
    • Domaines

Exercice Lakehouse

Contexte

Vous êtes data engineer dans une entreprise de vente en ligne. Vous disposez de fichiers CSV contenant les commandes clients des trois dernières années (2022, 2023, 2024), déposés dans le dossier Files/commandes/ de votre lakehouse Fabric. Votre mission : charger ces données, les nettoyer, les transformer et les stocker sous forme de table Delta interrogeable en SQL.

Prérequis

Un espace de travail Microsoft Fabric avec une capacité active (essai ou Premium).
Un lakehouse créé dans cet espace de travail.
Un notebook Fabric attaché au lakehouse.

Données d’exemple

Chaque fichier CSV (2022.csv, 2023.csv, 2024.csv) contient les colonnes suivantes :
NumeroCommande,LigneCommande,DateCommande,NomClient,Email,Produit,Quantite,PrixUnitaire,Taxe
SO001,1,2022-01-15,Marie Dupont,marie@example.com,Clavier,2,45.99,9.20
SO001,2,2022-01-15,Marie Dupont,marie@example.com,Souris,1,25.50,5.10
SO002,1,2022-01-18,Jean Martin,jean@example.com,Ecran,1,299.00,59.80
ExoLakehouse.zip
10 kB

Étape 1 : charger un fichier CSV dans un dataframe

La commande de base pour lire un fichier CSV dans un dataframe Spark :
# Charger un seul fichier CSV
df = spark.read.format("csv").option("header", True).load("Files/commandes/2022.csv")

# Afficher les premières lignes
display(df)
Le paramètre header=True indique que la première ligne du fichier contient les noms de colonnes.
A faire : exécutez ce code dans une cellule de votre notebook. Observez les types de données détectés : par défaut, toutes les colonnes sont de type string.

Etape 2 : charger plusieurs fichiers CSV avec un schéma explicite

Pour charger tous les fichiers du dossier et forcer les types de données corrects :
from pyspark.sql.types import *

# Définir le schéma explicitement
schema_commandes = StructType([
StructField("NumeroCommande", StringType()),
StructField("LigneCommande", IntegerType()),
StructField("DateCommande", DateType()),
StructField("NomClient", StringType()),
StructField("Email", StringType()),
StructField("Produit", StringType()),
StructField("Quantite", IntegerType()),
StructField("PrixUnitaire", FloatType()),
StructField("Taxe", FloatType())
])

# Charger tous les CSV du dossier avec le wildcard *
df = spark.read.format("csv") \
.schema(schema_commandes) \
.option("header", True) \
.load("Files/commandes/*.csv")

# Vérifier le nombre de lignes chargées
print(f"Nombre de lignes : {df.count()}")

# Vérifier le schéma
df.printSchema()

# Afficher un échantillon
display(df.limit(10))
A faire : comparez le résultat de printSchema() avec et sans schéma explicite. Que constatez-vous sur les types de la colonne Quantite ?

Etape 3 : explorer les données

Quelques commandes utiles pour explorer rapidement les données chargées :
# Nombre de lignes et de colonnes
print(f"Lignes : {df.count()}, Colonnes : {len(df.columns)}")

# Statistiques descriptives sur les colonnes numériques
display(df.describe())

# Compter les valeurs distinctes d'une colonne
from pyspark.sql.functions import countDistinct
df.select(countDistinct("NomClient").alias("NbClients")).show()

# Vérifier les valeurs nulles par colonne
from pyspark.sql.functions import col, sum as spark_sum
display(
df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
)
A faire : exécutez chaque bloc. Combien de clients distincts avez-vous ? Y a-t-il des valeurs nulles ?

Etape 4 : nettoyer et transformer les données

4.1. Supprimer les doublons

# Supprimer les lignes en double
df_clean = df.dropDuplicates()
print(f"Avant : {df.count()} lignes, Après : {df_clean.count()} lignes")

4.2. Supprimer les lignes avec des valeurs nulles sur les colonnes clés

# Supprimer les lignes où NumeroCommande ou DateCommande est null
df_clean = df_clean.dropna(subset=["NumeroCommande", "DateCommande"])

4.3. Ajouter des colonnes calculées

from pyspark.sql.functions import col, year, month, quarter, round

# Ajouter le montant total de la ligne
df_clean = df_clean.withColumn(
"MontantLigne",
round((col("PrixUnitaire") * col("Quantite")) + col("Taxe"), 2)
)

# Extraire l'année, le trimestre et le mois de la date de commande
df_clean = df_clean.withColumn("Annee", year(col("DateCommande")))
df_clean = df_clean.withColumn("Trimestre", quarter(col("DateCommande")))
df_clean = df_clean.withColumn("Mois", month(col("DateCommande")))

display(df_clean.limit(5))

4.4. Renommer une colonne

# Renommer une colonne
df_clean = df_clean.withColumnRenamed("PrixUnitaire", "PrixUnit")

4.5. Filtrer les données

# Ne garder que les commandes de 2023 et après
df_recent = df_clean.filter(col("Annee") >= 2023)
print(f"Commandes depuis 2023 : {df_recent.count()} lignes")
A faire : ajoutez une colonne Remise qui vaut 10 % du montant de la ligne si la quantité est supérieure ou égale à 5, et 0 sinon. Indice : utilisez when et otherwise.

Etape 5 : sauvegarder en table Delta dans le lakehouse

5.1. Créer une table Delta (écriture complète)

# Sauvegarder le dataframe comme table Delta dans le lakehouse
df_clean.write.mode("overwrite").format("delta").saveAsTable("commandes")
mode("overwrite") : remplace la table si elle existe déjà.
format("delta") : utilise le format Delta Lake (versionnement, transactions ACID).
saveAsTable("commandes") : crée la table dans la section Tables du lakehouse.

5.2. Ajouter des données à une table existante (append)

# Ajouter de nouvelles données à la table existante
df_nouvelles.write.mode("append").format("delta").saveAsTable("commandes")

5.3. Sauvegarder avec partitionnement

# Partitionner par année et trimestre pour optimiser les requêtes
df_clean.write.mode("overwrite") \
.format("delta") \
.partitionBy("Annee", "Trimestre") \
.saveAsTable("commandes_partitionnees")
A faire : sauvegardez votre dataframe nettoyé en table Delta nommée commandes. Puis vérifiez que la table apparaît dans la section Tables de l’explorateur du lakehouse.

Etape 6 : interroger la table Delta en SQL

Une fois la table créée, vous pouvez l’interroger directement en SQL Spark dans le notebook :

6.1. Requête de base

%%sql
SELECT * FROM commandes LIMIT 10

6.2. Agrégation : chiffre d’affaires par année

%%sql
SELECT
Annee,
COUNT(DISTINCT NumeroCommande) AS NbCommandes,
ROUND(SUM(MontantLigne), 2) AS ChiffreAffaires
FROM commandes
GROUP BY Annee
ORDER BY Annee

6.3. Top 5 des produits les plus vendus

%%sql
SELECT
Produit,
SUM(Quantite) AS QuantiteTotale,
ROUND(SUM(MontantLigne), 2) AS CATotal
FROM commandes
GROUP BY Produit
ORDER BY QuantiteTotale DESC
LIMIT 5

6.4. Requête SQL dans du code PySpark

# Exécuter une requête SQL et récupérer le résultat dans un dataframe
df_resultats = spark.sql("""
SELECT
Want to print your doc?
This is not the way.
Try clicking the ··· in the right corner or using a keyboard shortcut (
CtrlP
) instead.