# Fusion pour chaque table
for table_name in tables_erp:
print(f"Traitement de : {table_name}")
dfs = []
for client in clients:
try:
df = spark.table(f"{client['dbo']}.{table_name}")
df = df.withColumn("client_name", lit(client["client_name"]))
dfs.append(df)
except Exception as e:
print(f" Table {table_name} absente dans {client['lakehouse']} : {e}")
continue
if not dfs:
print(f" Aucune source disponible pour {table_name}, table ignorée.")
continue
# df_final = dfs[0]
for df in dfs[0:]:
df_final = df_final.unionByName(df, allowMissingColumns=True)
df_final.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.saveAsTable(f"{table_name}_unified")
print(f" Écrite dans Silver : {table_name}_unified")