Skip to content

Consolidation multi-clients : Bronze vers Silver

# =============================================================================
# CONSOLIDATION MULTI-CLIENTS : Bronze → Silver
# =============================================================================
# Ce notebook fusionne les tables de plusieurs lakehouses Bronze vers un
# lakehouse Silver centralisé. Il gère :
# - la lecture dynamique de la liste des tables depuis un CSV
# - l'ajout d'une colonne client_name sur chaque ligne
# - les différences de schéma entre clients (colonnes absentes → null)
# - trois stratégies d'écriture selon le type de clé :
# * ChampCléDate → overwrite de partition (idempotent, performant)
# * ChampCléID → MERGE Delta (upsert, sans doublon)
# * Aucune clé → overwrite complet
# =============================================================================
from pyspark.sql.functions import lit, col
from pyspark.sql import functions as F
from delta.tables import DeltaTable
import time
# =============================================================================
# 1. CONFIGURATION
# =============================================================================
# Déclarez ici chaque client avec le nom de son lakehouse Bronze.
# Pour ajouter un 4e client, il suffit d'ajouter une entrée dans cette liste.
clients = [
{"client_name": "Client1", "dbo": "dboClient1"},
{"client_name": "Client2", "dbo": "dboClient2"},
{"client_name": "Client3", "dbo": "dboClient3"},
]
# Chemin du CSV dans la section Files du lakehouse Silver (lakehouse par défaut
# du notebook). Le CSV doit contenir trois colonnes :
# NomTable ; ChampCléDate ; ChampCléID
# Une seule des deux colonnes clé est renseignée à la fois (ou aucune).
CHEMIN_CSV = "Files/ListeTable.csv"
print("=== Démarrage de la consolidation multi-clients ===")
print(f" {len(clients)} clients configurés : {[c['client_name'] for c in clients]}")
start_total = time.time()

# =============================================================================
# 2. LECTURE DYNAMIQUE DE LA LISTE DES TABLES
# =============================================================================
print(f"\nLecture de {CHEMIN_CSV}...")
liste_tables_df = (
spark.read
.option("header", "true")
.option("delimiter", ";")
.csv(CHEMIN_CSV)
)
# On construit un dictionnaire de configuration par table.
# Les champs vides du CSV deviennent None pour faciliter les tests booléens.
tables_config = [
{
"NomTable": row["NomTable"],
"ChampCléDate": row["ChampCléDate"] if row["ChampCléDate"] else None,
"ChampCléID": row["ChampCléID"] if row["ChampCléID"] else None,
}
for row in liste_tables_df.collect()
if row["NomTable"] is not None
]
print(f" {len(tables_config)} tables à traiter : {[t['NomTable'] for t in tables_config]}")

# =============================================================================
# 3. FONCTIONS D'ÉCRITURE (une par stratégie)
# =============================================================================
def ecrire_overwrite_complet(df_final, table_silver):
"""
Stratégie pour les tables sans clé : on réécrit toute la table Silver.
Simple et fiable, mais coûteux si la table est volumineuse.
"""
(
df_final.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.saveAsTable(table_silver)
)
print(f" Écriture : overwrite complet → {table_silver}")

def ecrire_overwrite_partition(df_final, table_silver, champ_date):
"""
Stratégie recommandée pour ChampCléDate.
On calcule la plage de dates présente dans le nouveau lot Bronze, puis on
réécrit uniquement la ou les partitions concernées dans le Silver.
Idempotent : rejouer le notebook sur la même plage donne le même résultat.
"""
# Calcul de la plage couverte par le nouveau lot
valeur_min = df_final.agg(F.min(col(champ_date))).collect()[0][0]
valeur_max = df_final.agg(F.max(col(champ_date))).collect()[0][0]
if valeur_min is None:
print(f" Colonne {champ_date} entièrement null, basculement en overwrite complet.")
ecrire_overwrite_complet(df_final, table_silver)
return
# Colonne de partition au format YYYY-MM pour regrouper par mois.
# Vous pouvez changer "yyyy-MM" en "yyyy" (année) ou "yyyy-MM-dd" (jour)
# selon le volume quotidien de vos données.
df_final = df_final.withColumn(
"partition_mois",
F.date_format(col(champ_date), "yyyy-MM")
)
mois_min = str(valeur_min)[:7]
mois_max = str(valeur_max)[:7]
print(f" Plage détectée : {valeur_min}{valeur_max} (partitions {mois_min} à {mois_max})")
Want to print your doc?
This is not the way.
Try clicking the ··· in the right corner or using a keyboard shortcut (
CtrlP
) instead.