Skip to content
fabric
Support formation Microsoft Fabric
  • Pages
    • Présentation du support
      • Cours DP-600
        • Untitled page
    • Organisation des formations Power BI & Fabric
    • Présentation de Fabric
      • Architecture Médaillon
      • Tableau comparatif : Power BI standalone vs Microsoft Fabric
      • 36 questions à se poser pour démarrer sur Fabric
      • 10 choses à arrêter de faire / commencer à faire sur Fabric
      • 7 erreurs de capacités, espaces de travail et de contrôle d'accès dans Fabric
      • Guides de décision
    • Charges de travail Fabric
      • Lakehouse
        • Gestion du partage d'un lakehouse
        • Ingestion avec un notebook
      • Warehouse
        • Gestion du partage d'un warehouse
      • Pipeline (Data Factory)
      • Real time
      • Data Science
      • Stratégies d'actualisation
    • Feuille de route d'adoption de Microsoft Fabric
      • Synthèse
    • Administration de Fabric
      • Licences Fabric
      • Sécurité
      • Rôles dans les espaces de travail
      • Superviser et gérer
      • Paramètres du client (tenant settings) - Portail d’administration Power BI
    • Suivi des évolutions
    • Exercices
      • Ressources pédagogiques
      • Exercice GreenCycle (Dataflow Gen2)
      • Exercice Lakehouse
    • Domaines

Ingestion avec un notebook

Depuis un espace de travail ou dans un lakehouse, créer un notebook.

Récapitulatif des commandes clés

Opération
Code PySpark
Lire un CSV
spark.read.format("csv").option("header", True).load("Files/fichier.csv")
Lire plusieurs CSV
spark.read.format("csv").schema(mon_schema).load("Files/dossier/*.csv")
Afficher les données
display(df) ou df.show()
Voir le schéma
df.printSchema()
Compter les lignes
df.count()
Supprimer les doublons
df.dropDuplicates()
Supprimer les nulls
df.dropna(subset=["colonne"])
Ajouter une colonne
df.withColumn("nom", expression)
Renommer une colonne
df.withColumnRenamed("ancien", "nouveau")
Filtrer
df.filter(col("colonne") > valeur)
Agréger
df.groupBy("colonne").agg(sum("valeur"))
Sauver en Delta
df.write.mode("overwrite").format("delta").saveAsTable("table")
Ajouter à une table
df.write.mode("append").format("delta").saveAsTable("table")
Requête SQL
spark.sql("SELECT * FROM table")
SQL dans une cellule
%%sql en première ligne de la cellule
There are no rows in this table

Ingérer plusieurs tables de plusieurs lakehouse


Préparer les raccourcis

Dans un lakehouse, créer des raccourcis vers les schémas des autres lakehouse :
Obtenir des données > Nouveau raccourci
Sélectionner OneLake puis le lakehouse et Suivant
Dans Tables, cocher dbo (ou le schéma concerné), puis Suivant
Dans la dernière étape, cliquer sur Crayon dans Actions : ​
image.png
Donner un nom spécifique au dbo, par ex. dboClientA (doit être unique dans le lakehouse)
Cliquer sur Créer.

Préparer le fichier des noms de tables

Créer un fichier CSV NomTable.csv, écrire NomTable en première ligne, puis les noms des tables sut chaque ligne suivante.
Obtenir des données > Charger des fichiers
Cliquer sur le dossier pour afficher la boite Ouvrir
Sélectionner le fichier CSV et Charger. Fermer le volet. Le fichier est dans le dossier Files.

Compléter le notebook

Ouvrir le notebook > Nouveau notebook.
Écrire le code suivant :
from pyspark.sql.functions import lit

# Définition des clients et de leur lakehouse source
clients = [
{"client_name": "Client1", "dbo": "dboClient1"},
{"client_name": "Client2", "dbo": "dboClient2"}
]
Remplacer Client1 pour le vrai nom du client, qui sera placé dans une nouvelle colonne.
Remplacer dboClient1 par le nom du schéma lié.
# Lecture du CSV depuis la section Files du lakehouse
liste_tables_df = spark.read \
.option("header", "true") \
.option("delimiter", ";") \
.csv("Files/ListeTable.csv")
Remplacer Files/ListeTable.csv par votre “vrai” chemin si besoin.
Vérifier aussi la casse.
tables_erp = [
row["NomTable"]
for row in liste_tables_df.collect()
if row["NomTable"] is not None
]
“NomTable” est indiqué sur la première ligne du fichier CSV
# Fusion pour chaque table
for table_name in tables_erp:
print(f"Traitement de : {table_name}")
dfs = []

for client in clients:
try:
df = spark.table(f"{client['dbo']}.{table_name}")
df = df.withColumn("client_name", lit(client["client_name"]))
dfs.append(df)
except Exception as e:
print(f" Table {table_name} absente dans {client['lakehouse']} : {e}")
continue

if not dfs:
print(f" Aucune source disponible pour {table_name}, table ignorée.")
continue

# df_final = dfs[0]
for df in dfs[0:]:
df_final = df_final.unionByName(df, allowMissingColumns=True)

df_final.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.saveAsTable(f"{table_name}_unified")

print(f" Écrite dans Silver : {table_name}_unified")
Boucle sur le liste des tables.
Sous-boucle sur la liste des clients.
Si une table n’existe pas dans le
There are no rows in this table
Enfin, exécuter le notebook





Want to print your doc?
This is not the way.
Try clicking the ··· in the right corner or using a keyboard shortcut (
CtrlP
) instead.