Skip to content
fabric
Support formation Microsoft Fabric
  • Pages
    • Présentation du support
      • Cours DP-600
        • Untitled page
      • Cours DP-700
        • Configuring Access Control in Microsoft Fabric | DP-700 EXAM PREP (Video 4 of 11)
    • Organisation des formations Power BI & Fabric
    • Présentation de Fabric
      • Architectures
        • Architecture Médaillon
        • Les architectures de données
      • Tableau comparatif : Power BI standalone vs Microsoft Fabric
      • 36 questions à se poser pour démarrer sur Fabric
      • 10 choses à arrêter de faire / commencer à faire sur Fabric
      • 7 erreurs de capacités, espaces de travail et de contrôle d'accès dans Fabric
      • Guides de décision
    • Charges de travail Fabric
      • Stratégies d'actualisation
      • Lakehouse
        • Gestion du partage d'un lakehouse
        • Ingestion avec un notebook
      • Warehouse
        • Gestion du partage d'un warehouse
      • Pipeline (Data Factory)
      • Real time
      • Data Science
    • Feuille de route d'adoption de Microsoft Fabric
      • Synthèse
    • Administration de Fabric
      • Licences Fabric
      • Sécurité
      • Rôles dans les espaces de travail
      • Superviser et gérer
      • Paramètres du client (tenant settings) - Portail d’administration Power BI
    • Suivi des évolutions
    • Exercices
      • Ressources pédagogiques
      • Exercice GreenCycle (Dataflow Gen2)
      • Exercice Lakehouse
    • Domaines
    • Untitled page

Untitled page

# =============================================================================
# CONSOLIDATION MULTI-CLIENTS : Bronze → Silver
# =============================================================================
# Ce notebook fusionne les tables ERP de plusieurs lakehouses Bronze vers un
# lakehouse Silver centralisé. Il gère :
# - la lecture dynamique de la liste des tables depuis un CSV
# - l'ajout d'une colonne client_name sur chaque ligne
# - les différences de schéma entre clients (colonnes absentes → null)
# - trois stratégies d'écriture selon le type de clé :
# * ChampCléDate → overwrite de partition (idempotent, performant)
# * ChampCléID → MERGE Delta (upsert, sans doublon)
# * Aucune clé → overwrite complet
# =============================================================================
from pyspark.sql.functions import lit, col
from pyspark.sql import functions as F
from delta.tables import DeltaTable
import time
# =============================================================================
# 1. CONFIGURATION
# =============================================================================
# Déclarez ici chaque client avec le nom de son lakehouse Bronze.
# Pour ajouter un 4e client, il suffit d'ajouter une entrée dans cette liste.
clients = [
{"client_name": "Client1", "dbo": "dboClient1"},
{"client_name": "Client2", "dbo": "dboClient2"},
{"client_name": "Client3", "dbo": "dboClient3"},
]
# Chemin du CSV dans la section Files du lakehouse Silver (lakehouse par défaut
# du notebook). Le CSV doit contenir trois colonnes :
# NomTable ; ChampCléDate ; ChampCléID
# Une seule des deux colonnes clé est renseignée à la fois (ou aucune).
CHEMIN_CSV = "Files/ListeTable.csv"
print("=== Démarrage de la consolidation multi-clients ===")
print(f" {len(clients)} clients configurés : {[c['client_name'] for c in clients]}")
start_total = time.time()

# =============================================================================
# 2. LECTURE DYNAMIQUE DE LA LISTE DES TABLES
# =============================================================================
print(f"\nLecture de {CHEMIN_CSV}...")
liste_tables_df = (
spark.read
.option("header", "true")
.option("delimiter", ";")
.csv(CHEMIN_CSV)
)
# On construit un dictionnaire de configuration par table.
# Les champs vides du CSV deviennent None pour faciliter les tests booléens.
tables_config = [
{
"NomTable": row["NomTable"],
"ChampCléDate": row["ChampCléDate"] if row["ChampCléDate"] else None,
"ChampCléID": row["ChampCléID"] if row["ChampCléID"] else None,
}
for row in liste_tables_df.collect()
if row["NomTable"] is not None
]
print(f" {len(tables_config)} tables à traiter : {[t['NomTable'] for t in tables_config]}")

# =============================================================================
# 3. FONCTIONS D'ÉCRITURE (une par stratégie)
# =============================================================================
def ecrire_overwrite_complet(df_final, table_silver):
"""
Stratégie pour les tables sans clé : on réécrit toute la table Silver.
Simple et fiable, mais coûteux si la table est volumineuse.
"""
(
df_final.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.saveAsTable(table_silver)
)
print(f" Écriture : overwrite complet → {table_silver}")

def ecrire_overwrite_partition(df_final, table_silver, champ_date):
"""
Stratégie recommandée pour ChampCléDate.
On calcule la plage de dates présente dans le nouveau lot Bronze, puis on
réécrit uniquement la ou les partitions concernées dans le Silver.
Idempotent : rejouer le notebook sur la même plage donne le même résultat.
"""
# Calcul de la plage couverte par le nouveau lot
valeur_min = df_final.agg(F.min(col(champ_date))).collect()[0][0]
valeur_max = df_final.agg(F.max(col(champ_date))).collect()[0][0]
if valeur_min is None:
print(f" Colonne {champ_date} entièrement null, basculement en overwrite complet.")
ecrire_overwrite_complet(df_final, table_silver)
return
# Colonne de partition au format YYYY-MM pour regrouper par mois.
# Vous pouvez changer "yyyy-MM" en "yyyy" (année) ou "yyyy-MM-dd" (jour)
# selon le volume quotidien de vos données.
df_final = df_final.withColumn(
"partition_mois",
F.date_format(col(champ_date), "yyyy-MM")
)
mois_min = str(valeur_min)[:7]
mois_max = str(valeur_max)[:7]
print(f" Plage détectée : {valeur_min}{valeur_max} (partitions {mois_min} à {mois_max})")
(
df_final.write
.format("delta")
.mode("overwrite")
# replaceWhere cible uniquement les partitions de la plage, sans toucher
# aux données historiques en dehors de cette fenêtre.
.option("replaceWhere", f"partition_mois >= '{mois_min}' AND partition_mois <= '{mois_max}'")
.option("mergeSchema", "true")
.partitionBy("partition_mois")
.saveAsTable(table_silver)
)
print(f" Écriture : overwrite partition [{mois_min}{mois_max}] → {table_silver}")

def ecrire_merge(df_final, table_silver, champ_id):
"""
Stratégie recommandée pour ChampCléID.
Le MERGE Delta compare chaque ligne entrante avec la table Silver sur la
clé ID + client_name. Si la ligne existe déjà, elle est mise à jour ;
sinon elle est insérée. Aucun doublon possible, même en cas de rejeu.
"""
if spark.catalog.tableExists(table_silver):
dt = DeltaTable.forName(spark, table_silver)
(
dt.alias("silver")
.merge(
df_final.alias("bronze"),
# La clé composite (champ_id + client_name) garantit l'unicité
# par client, même si deux clients partagent les mêmes IDs.
f"silver.{champ_id} = bronze.{champ_id} "
f"AND silver.client_name = bronze.client_name"
)
.whenMatchedUpdateAll() # met à jour les lignes modifiées
.whenNotMatchedInsertAll() # insère les nouvelles lignes
.execute()
)
print(f" Écriture : MERGE sur {champ_id} + client_name → {table_silver}")
else:
# Premier chargement : la table Silver n'existe pas encore
print(f" Table Silver absente, premier chargement complet par overwrite.")
ecrire_overwrite_complet(df_final, table_silver)

# =============================================================================
# 4. BOUCLE PRINCIPALE DE CONSOLIDATION
# =============================================================================
tables_ok = 0
tables_en_erreur = []
for i, config in enumerate(tables_config, start=1):
table_name = config["NomTable"]
champ_date = config["ChampCléDate"]
champ_id = config["ChampCléID"]
table_silver = f"{table_name}_unified"
# Détermination du mode pour ce tour de boucle
if champ_date:
mode_label = f"overwrite partition sur {champ_date}"
elif champ_id:
mode_label = f"MERGE sur {champ_id}"
else:
mode_label = "overwrite complet (aucune clé)"
print(f"\n--- [{i}/{len(tables_config)}] {table_name} | {mode_label} ---")
start_table = time.time()
# ------------------------------------------------------------------
# Lecture et filtrage incrémental depuis chaque lakehouse Bronze
# ------------------------------------------------------------------
dfs = []
for client in clients:
try:
df = spark.table(f"{client['dbo']}.{table_name}")
nb_brut = df.count()
# Pour les stratégies incrémentales, on ne lit que les nouvelles
# lignes en calculant d'abord le MAX présent dans le Silver.
champ_cle = champ_date or champ_id
if champ_cle and spark.catalog.tableExists(table_silver):
valeur_max_silver = (
spark.table(table_silver)
.agg(F.max(col(champ_cle)))
.collect()[0][0]
)
if valeur_max_silver is not None:
df = df.filter(col(champ_cle) > valeur_max_silver)
nb_filtre = df.count()
print(f" {client['client_name']} : {nb_brut} lignes totales "
f"→ {nb_filtre} nouvelles ({champ_cle} > {valeur_max_silver})")
else:
print(f" {client['client_name']} : {nb_brut} lignes (Silver vide, chargement complet)")
else:
print(f" {client['client_name']} : {nb_brut} lignes (chargement complet)")
df = df.withColumn("client_name", lit(client["client_name"]))
dfs.append(df)
except Exception as e:
print(f" {client['client_name']} : ERREUR — {e}")
tables_en_erreur.append({
"table": table_name,
"client": client["client_name"],
"erreur": str(e)
})
continue
if not dfs:
print(f" Aucune source disponible, table ignorée.")
continue
# ------------------------------------------------------------------
Want to print your doc?
This is not the way.
Try clicking the ··· in the right corner or using a keyboard shortcut (
CtrlP
) instead.