Spark : v4.x - Fonctionnalités - Spark Declarative Pipelines
Vous trouverez dans cet article, des informations sur la fonctionnalité Spark Declarative Pipelines (SDP) de Spark v4.1.
Introduction
En Juin 2022, Databricks a annoncé la disponibilité générale d’une nouvelle fonctionnalité nommée Delta Live Tables (DLT) dans son écosystème et permettant le développement de manière déclarative des pipelines de traitements Spark. Cette fonctionnalité se basait principalement sur le format de données Delta Lake et sur le catalogue de données Unity Catalog.
En juin 2025, Lors de l’événement Data+AI Summit 2025, Databricks a annoncé publiquement l’ouverture en open source d’une partie cœur de son framework déclaratif de traitement de données basé sur Apache Spark en excluant, par conséquent, un grand nombre de fonctionnalités et d’optimisations qui seront disponibles uniquement sur la plateforme Databricks.
En Juillet 2025, Databricks a annoncé le renommage de la fonctionnalité Delta Live Tables en Lakeflow Declarative Pipelines qui est devenu le nom officiel de cette fonctionnalité au sein de la plateforme Databricks.
Information
Le framework Spark Declarative Pipelines (SDP) est un nouveau composant d’Apache Spark 4.1 conçu pour que les développeurs se concentrent sur les transformations de données plutôt que sur la gestion explicite des dépendances et de l’exécution des pipelines.
En pratique, le framework SDP permet de décrire ce que les données doivent être, et Spark se charge du comment : ordonnancement des tâches, parallélisme, checkpoints, tentatives.
L’API s’appuie sur des décorateurs Python et du SQL, avec un client dédié (spark-pipelines) pour exécuter les pipelines.
Le framework SDP gère l’ordre d’exécution, la résolution des dépendances et le traitement incrémental, ce qui facilite le développement, la maintenance, les tests et la surveillance des pipelines.
Le framework SDP est à l’image d’un outil tel que DBT ou SQLMesh mais il est encore immature et avec un écosystème (fonctionnalités, documentations, optimisations, possibilités, compatibilités, …) très limité.
Le guide de programmation officiel est extrêmement claire et concis pour comprendre les concepts importants.
Les éléments clés :
- Le fichier YAML de définition d’un pipeline permet de définir les éléments structurants pour le parsing et l’exécution des flows.
- Il est possible d’utiliser des scripts python (avec des décorateurs) et des scripts SQL.
- Les trois types d’objets (
datasets) utilisables :- Streaming Table : Permet de créer une table pour stocker les données traitées avec Spark Streaming
- Syntaxe Python :
@dp.tableoudp.create_streaming_table - Syntaxe SQL :
CREATE STREAMING TABLE
- Syntaxe Python :
- Materialized View : Permet de créer une table pour stocker les données en batch
- Syntaxe Python :
@dp.materialized_view - Syntaxe SQL :
CREATE MATERIALIZED VIEW
- Syntaxe Python :
- Temporary View : Permet de créer une vue temporaire pour les résultats intermédiaires lors de l’exécution du pipeline
- Syntaxe Python :
@dp.temporary_view - Syntaxe SQL :
CREATE TEMPORARY VIEW
- Syntaxe Python :
- Streaming Table : Permet de créer une table pour stocker les données traitées avec Spark Streaming
Les commandes spécifiques au framework SDP :
spark-pipelines init: Pour initialiser un premier projetspark-pipelines dry-run --spec pipeline_def.yaml: Pour valider un pipeline sans l’exécuter (très utile pour valider les pipelines développés lors de l’exécution d’une pipeline CI/CD)spark-pipelines run --spec pipeline_def.yaml: Pour exécuter le pipeline souhaité
Le fonctionnement générale du framework SDP :
- Début de l’exécution.
- Lecture du fichier de définition du pipeline.
- Création d’une session Spark en s’appuyant sur les éléments de configuration définis dans le fichier de définition du pipeline.
- Parsing de l’ensemble des fichiers Python ou SQL listés dans le fichier de définition du pipeline.
- Génération du graphe de flux de données (
dataflow graph) qui permet au moteur de planifier l’ordre d’exécution des différents éléments constituant le pipeline en fonction des dépendances déclarées. - Exécution des différents flux dans l’ordre défini (parallélisation et séquencement en fonction des dépendances).
- Fin de l’exécution.
Documentation :
Attention : Lors des tests, le framework avait beaucoup de mal à définir le graphe de dépendance lorsque le code était entièrement fait avec les décorateur Python. De très nombreuses erreurs était liés au fait que le framework chercher l’existant d’une table dans le catalogue de données plutôt que dans les scripts du pipeline. L’utilisation du code SQL semble beaucoup mieux gérer pour le moment.
Avantages
- Résolution automatique des dépendances et optimisation de l’exécution du pipeline : C’est le bénéfice le plus concret. L’ordre de définition des fonctions Python ou des requêtes SQL n’a pas d’importance. Le framework SDP analyse le graphe et détermine l’ordre d’exécution optimal. Le framework SDP va exécuter en parallèle les requêtes qui n’ont pas de dépendance directe puis exécuter séquentiellement celles avec des dépendances.
- Permet la validation pré-exécution avec dry-run : Le
dry-rundétecte les erreurs de syntaxe (Python ou SQL), les erreurs d’analyse (table ou colonne inexistante) et les erreurs de validation du graphe comme les dépendances cycliques sans lire ni écrire aucune donnée. Cela permet d’ajouter un contrôle automatique au niveau des pipelines CI/CD. - Format-agnostique : Le framework SDP supporte toutes les sources de données et tous les catalogues supportés par Spark. Il est recommandé d’utiliser les formats de données Iceberg ou Delta Lake.
- Facilite la maintenance : Il est beaucoup plus facile de comprendre et maintenir un pipeline se basant sur un framework déclaratif qu’un ensemble de traitements PySpark impératifs
Limitations
- Immaturité de l’écosystème (fonctionnalités et outillages) : Le framework SDP est dans un cycle de développement très actif. Le comportement de certaines fonctionnalités peut changer entre les versions de Spark. Tout n’est pas encore documenté. Il est bien trop tôt pour l’adopter en production. Les orchestrateurs populaires (Apache Airflow, Prefect, Dagster) ne gèrent pas forcément ce framework (ou de façon très limitée).
- Les features premium restent chez Databricks : Les fonctionnalités non disponibles dans le framework SDP incluent par exemple les fonctionnalités suivantes :
- L’
Auto CDC(Change Data Capture) - Les
Expectationsavecenforcementautomatique (@dp.expect, mise en quarantaine automatisée des lignes invalides) - Data lineage visuel et observabilité avancée intégrée
- Journal d’événements (event log) requêtable pour l’audit de pipeline
- L’
- Débogage moins intuitif qu’avec du code impératif : Les stack traces référencent des classes internes du framework SDP et non pas directement les lignes du code Python ou SQL. Ce point est particulièrement critique pour les équipes habituées à déboguer des jobs Spark avec des instructions de type
spark.readoudf.show(). - Limitation des fonctions et des optimisations : Beaucoup moins de possibilités pour gérer des pipelines et des traitements complexes. Cela peut nécessiter de repasser par du code impératif (exemple : gestion des états pour le streaming, besoin des fonctions non supportés comme la fonction
pivot, …).
Codes
Démarche
Pour tester le framework SDP, nous allons passer par les étapes suivantes :
- Création d’un cluster Spark v4.1.1 en local
- Création d’un jeu de données limité
- Création d’un pipeline dans un projet avec le framework SDP
- Exécution du pipeline créé
Les répertoires utilisés pour ce projet sont :
application: Répertoire pour stocker l’ensemble des scriptsdata: Répertoire pour stocker les jeux de données (en entrée et en sortie)logs: Répertoire pour stocker les logs d’exécutions du cluster Sparkspark-image-docker: Répertoire pour stocker les fichiers nécessaires à la création de l’image Docker
Cluster Spark
Pour pouvoir tester le framework SDP avec la version 4.1.1 de Spark, nous allons créer une image Docker et les éléments nécessaires pour avoir un cluster local composé d’un nœud Master (driver) et de deux nœuds Workers.
Création d’une image Docker
Étapes à réaliser à partir du répertoire spark-image-docker :
- Création du fichier
spark-defaults.confpermettant de définir les éléments de configuration pour le cluster Spark. - Création du fichier
spark-env.shpermettant de définir les variables d’environnement pour le cluster Spark. - Création du fichier
spark-start.shpermettant de définir le script d’exécution des services du cluster Spark. - Création du fichier
Dockerfilepermettant de définir le contenu de l’image Spark pour la création du cluster Spark. - Exécution de la commande de création de l’image Docker :
docker build -t spark4 spark-image-docker --no-cache.
Contenu du fichier spark-defaults.conf :
1# --- Force JARs into Classpath ---
2spark.driver.extraClassPath /opt/spark/jars/*
3spark.executor.extraClassPath /opt/spark/jars/*
4
5# --- History Server Configuration ---
6spark.eventLog.enabled true
7spark.eventLog.dir file:///opt/spark/event_logs
8spark.history.fs.logDirectory file:///opt/spark/event_logs
9
10# --- Delta Configuration ---
11spark.connect.extensions.relation.classes org.apache.spark.sql.connect.delta.DeltaRelationPlugin
12spark.connect.extensions.command.classes org.apache.spark.sql.connect.delta.DeltaCommandPlugin
13spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
14
15# --- Database Configurations ---
16spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
17
18
19# --- Default Catalog configuration ---
20spark.sql.defaultCatalog spark_catalog
21
22# --- Default warehouse local path configuration ---
23spark.sql.warehouse.dir file:///opt/spark/data/warehouse
Contenu du fichier spark-env.sh :
1#!/bin/env bash
2
3export SPARK_LOCAL_IP=`hostname -i`
4export SPARK_PUBLIC_DNS=`hostname -f`
Contenu du fichier spark-start.sh :
1#!/bin/bash
2
3# Start the SSH daemon
4/usr/sbin/sshd
5if [ $? -ne 0 ]; then
6 echo "Failed to start SSH server. Exiting."
7 exit 1
8fi
9
10if [ "$SPARK_MODE" = "master" ]; then
11 echo "Starting Spark Master..."
12 # Spark Master/Driver
13 $SPARK_HOME/sbin/start-master.sh
14 # Spark Connect
15 $SPARK_HOME/sbin/start-connect-server.sh
16 # History server
17 $SPARK_HOME/sbin/start-history-server.sh
18elif [ "$SPARK_MODE" = "worker" ]; then
19 echo "Starting Spark Worker..."
20 # Spark Worker
21 $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER_URL
22else
23 echo "Unknown SPARK_MODE: $SPARK_MODE"
24 exit 1
25fi
26
27# Keep the container alive
28tail -f $SPARK_HOME/logs/*
Contenu du fichier Dockerfile :
1# Use OpenJDK base image
2FROM eclipse-temurin:21-jdk-jammy
3
4
5# Define env variables
6ENV SPARK_MASTER="spark://spark-master:7077"
7ENV SPARK_MASTER_HOST=spark-master
8ENV SPARK_MASTER_PORT=7077
9ENV PYSPARK_PYTHON=python3
10ENV SPARK_HOME=/opt/spark
11ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
12ENV DELTA_VERSION="4.1.0"
13ENV SPARK_VERSION="4.1.1"
14ENV SCALA_VERSION="2.13"
15
16
17# Install tools
18RUN apt-get update \
19 && apt-get install -y --no-install-recommends wget tar iputils-ping rsync openssh-server openssh-client \
20 && apt-get install -y --no-install-recommends python3 python3-pip \
21 && rm -rf /var/lib/apt/lists/*
22
23# Manage SSH informations
24RUN mkdir -p /root/.ssh/ \
25 && ssh-keygen -t rsa -f /root/.ssh/id_rsa -q -N "" \
26 && cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys \
27 && chmod 600 /root/.ssh/authorized_keys \
28 && echo "Host *" >> /root/.ssh/config \
29 && echo " StrictHostKeyChecking no" >> /root/.ssh/config \
30 && chmod 600 /root/.ssh/config \
31 && mkdir -p /var/run/sshd \
32 && ssh-keygen -A
33
34# Install Spark
35RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz
36RUN tar -xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz \
37 && rm spark-${SPARK_VERSION}-bin-hadoop3.tgz \
38 && mv spark-${SPARK_VERSION}-bin-hadoop3 ${SPARK_HOME} \
39 && chown -R root:root ${SPARK_HOME} \
40 && mkdir -p ${SPARK_HOME}/logs \
41 && mkdir -p ${SPARK_HOME}/event_logs
42
43# Download the Delta Spark JAR directly into Spark's main jars directory
44# Spark automatically loads all JARs from this folder on startup.
45RUN wget https://repo1.maven.org/maven2/io/delta/delta-spark_${SCALA_VERSION}/${DELTA_VERSION}/delta-spark_${SCALA_VERSION}-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
46RUN wget https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_VERSION}/delta-storage-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
47RUN wget https://repo1.maven.org/maven2/io/delta/delta-connect-client_${SCALA_VERSION}/${DELTA_VERSION}/delta-connect-client_${SCALA_VERSION}-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
48
49# Set up Spark configuration for logging and history server
50COPY spark-defaults.conf $SPARK_HOME/conf/spark-defaults.conf
51
52# Set up Spark configuration scripts
53COPY spark-env.sh $SPARK_HOME/conf/spark-env.sh
54COPY spark-start.sh $SPARK_HOME/spark-start.sh
55RUN chmod +x $SPARK_HOME/conf/spark-env.sh
56RUN chmod +x $SPARK_HOME/spark-start.sh
57
58# Expose needed ports
59EXPOSE 7077 8080 4040 15002 22
60
61# Entrypoint config
62CMD ["/opt/spark/spark-start.sh"]
Création du fichier Docker Compose
Étapes à réaliser à partir du répertoire racine :
- Création du fichier
compose.ymlpermettant de définir les différents éléments du cluster qui sera composé d’un nœudMasteret de deux nœudsWorkers - Démarrer le cluster local avec la commande
docker-compose up -d - Arrêter le cluster local avec la commande
docker-compose down
Contenu du fichier compose.yml :
1services:
2
3 spark-master:
4 image: spark4
5 container_name: spark-master
6 hostname: spark-master
7 environment:
8 - SPARK_MODE=master
9 - SPARK_RPC_AUTHENTICATION_ENABLED=false
10 - SPARK_RPC_ENCRYPTION_ENABLED=false
11 - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=false
12 - SPARK_SSL_ENABLED=false
13 - SPARK_PUBLIC_DNS=spark-master
14 - SPARK_MASTER_HOST=spark-master
15 - SPARK_MASTER_PORT=7077
16 - SPARK_DRIVER_MEMORY=1g
17 - SPARK_DRIVER_CORES=1
18 - SPARK_EXECUTOR_MEMORY=1g
19 - SPARK_MASTER_WEBUI_PORT=8080
20 ports:
21 - "4040:4040" # Application UI (Job Details)
22 - "8080:8080" # Interface web du master
23 - "7077:7077" # Port de communication Spark
24 - "15002:15002" # Port Spark Connect
25 - "18080:18080" # Interface History Server
26 deploy:
27 resources:
28 limits:
29 cpus: '1'
30 memory: 1G
31 volumes:
32 - ./data:/opt/spark/data
33 - ./logs/events:/opt/spark/event_logs
34 - ./application/src:/home/root/src
35 networks:
36 - spark-network
37
38
39 spark-worker-1:
40 image: spark4
41 container_name: spark-worker-1
42 hostname: spark-worker-1
43 depends_on:
44 - spark-master
45 environment:
46 - SPARK_MODE=worker
47 - SPARK_MASTER_URL=spark://spark-master:7077
48 - SPARK_RPC_AUTHENTICATE=false
49 - SPARK_RPC_ENCRYPTION=false
50 - SPARK_LOCAL_STORAGE_ENCRYPTION=false
51 - SPARK_SSL_ENABLED=no
52 - SPARK_PUBLIC_DNS=spark-worker-1
53 - SPARK_MASTER_HOST=spark-master
54 - SPARK_MASTER_PORT=7077
55 - SPARK_WORKER_CORES=2
56 - SPARK_WORKER_MEMORY=2g
57 - SPARK_EXECUTOR_MEMORY=1g
58 - SPARK_WORKER_WEBUI_PORT=8081
59 ports:
60 - "8081:8081" # Interface web (worker)
61 volumes:
62 - ./data:/opt/spark/data
63 networks:
64 - spark-network
65 deploy:
66 resources:
67 limits:
68 cpus: '2'
69 memory: 2G
70
71
72 spark-worker-2:
73 image: spark4
74 container_name: spark-worker-2
75 hostname: spark-worker-2
76 depends_on:
77 - spark-master
78 environment:
79 - SPARK_MODE=worker
80 - SPARK_MASTER_URL=spark://spark-master:7077
81 - SPARK_RPC_AUTHENTICATE=false
82 - SPARK_RPC_ENCRYPTION=false
83 - SPARK_LOCAL_STORAGE_ENCRYPTION=false
84 - SPARK_SSL_ENABLED=no
85 - SPARK_PUBLIC_DNS=spark-worker-2
86 - SPARK_MASTER_HOST=spark-master
87 - SPARK_MASTER_PORT=7077
88 - SPARK_WORKER_CORES=2
89 - SPARK_WORKER_MEMORY=2g
90 - SPARK_EXECUTOR_MEMORY=1g
91 - SPARK_WORKER_WEBUI_PORT=8081
92 ports:
93 - "8082:8081" # Interface web (worker)
94 volumes:
95 - ./data:/opt/spark/data
96 networks:
97 - spark-network
98 deploy:
99 resources:
100 limits:
101 cpus: '2'
102 memory: 2G
103
104
105networks:
106 spark-network:
107 driver: bridge
Contenu du fichier de log après exécution de la commande docker-compose up -d :
1[+] up 4/4
2 ✔ Network spark4_spark-network Created
3 ✔ Container spark-master Created
4 ✔ Container spark-worker-1 Created
5 ✔ Container spark-worker-2 Created
Contenu du fichier de log après exécution de la commande docker-compose down :
1[+] down 4/4
2 ✔ Container spark-worker-2 Removed
3 ✔ Container spark-worker-1 Removed
4 ✔ Container spark-master Removed
5 ✔ Network spark4_spark-network Removed
Liste des ports et des interfaces
En se basant sur la configuration définie dans le fichier compose.yml :
4040: Port de communication avec l’interface applicative (UI) des jobs7077: Port de communication interne du cluster Spark8080: Port de communication avec l’interface web du Master (driver)8081: Port de communication avec l’interface web du Worker n°18082: Port de communication avec l’interface web du Worker n°215002: Port de communication avec le serveur Spark Connect18080: Port de communication avec l’interface History Server
Jeu de données
Dans le répertoire data :
- Création d’un répertoire
input:mkdir -p data/input - Création du fichier
clients.csvpour stocker quelques lignes définissants des clients - Création du fichier
products.csvpour stocker quelques lignes définissant les produits - Création du fichier
orders.csvpour stocker quelques lignes définissant des événements d’achats des produits par les clients
Contenu du fichier clients.csv :
1id_client,lib_client,lib_city
21,Alice,Paris
32,Bob,Lyon
43,Charlie,Marseille
54,David,Lille
65,Eve,Bordeaux
Contenu du fichier products.csv :
1id_product,lib_product,mnt_prix_unit,lib_category
2101,Laptop,1200.0,Electronique
3102,Souris,25.0,Accessoires
4103,Clavier,45.0,Accessoires
5104,Ecran,200.0,Electronique
6105,Casque,80.0,Audio
Contenu du fichier orders.csv :
1id_commande,id_client,id_product,nb_quantity,dt_commande
21,1,101,1,2025-10-01
32,2,102,2,2025-10-01
43,3,103,1,2025-10-02
54,1,104,1,2025-10-02
65,5,105,2,2025-10-03
76,2,101,1,2025-10-03
87,4,103,3,2025-10-04
98,1,102,1,2025-10-04
109,3,101,1,2025-10-05
1110,5,104,1,2025-10-05
1211,2,105,1,2025-10-06
1312,4,101,1,2025-10-06
1413,1,105,1,2025-10-07
1514,3,102,4,2025-10-07
1615,5,103,1,2025-10-08
Création d’un pipeline dans un projet SDP
Les étapes sont les suivantes :
- Installation du framework SDP en local
- Initialisation d’un projet SDP template en utilisant la commande
spark-pipelines init - Création des éléments nécessaires au pipeline
Installation du framework SDP en local
L’installation des éléments nécessaires pour pouvoir tester le framework SDP se fait avec la commande pip suivante : pip install pyspark[pipelines].
Suite à certain problème de compatibilité des dépendances sur mon environnement de travail, j’ai utilisé la commande suivante pour installer les éléments nécessaires avec les versions compatibles correspondantes : pip install pyspark==4.1.1 pyspark[pipelines] protobuf==6.33.0
Initialisation d’un projet SDP
Afin de pouvoir initialiser un projet SDP, le plus simple est de se positionner dans le répertoire application et d’exécuter la commande suivante spark-pipelines init --name LoadOrdersData.
Cela aura pour effet de créer un répertoire nommé LoadOrdersData dans le répertoire application ainsi que l’arborescence suivante :
pipeline-storage/: Répertoire permettant de stocker les checkpoints lors de l’utilisation de Spark Streamingtransformations/: Répertoire permettant de stocker l’ensemble des scripts Python et SQL définissant les pipelinesspark-pipeline.yml: Fichier de configuration du pipeline
Création des éléments nécessaire au pipeline
L’objectif est de créer le pipeline suivante :
graph TD;
A(clients.csv) --load_bronze_orders.py--> D(sdp_bronze.raw_clients)
B(products.csv) --load_bronze_orders.py--> E(sdp_bronze.raw_products)
C(orders.csv) --load_bronze_orders.py--> F(sdp_bronze.raw_orders)
D(sdp_bronze.raw_clients) --load_silver_orders.sql--> G(sdp_silver.refined_clients)
E(sdp_bronze.raw_products) --load_silver_orders.sql--> H(sdp_silver.refined_products)
F(sdp_bronze.raw_orders) --load_silver_orders.sql--> I(sdp_silver.refined_orders)
H(sdp_silver.refined_products) --load_gold_orders.sql--> J(sdp_gold.dist_by_product)
I(sdp_silver.refined_orders) --load_gold_orders.sql--> J(sdp_gold.dist_by_product)
G(sdp_silver.refined_clients) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
H(sdp_silver.refined_products) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
I(sdp_silver.refined_orders) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
Les étapes sont les suivantes :
- Création du script python
load_bronze_orders.pypermettant de définir les tables pour stocker les données des fichiers CSV - Création du script SQL
load_silver_orders.sqlpermettant de raffiner les tables de la zone bronze dans la zone silver - Création du script SQL
load_gold_orders.sqlpermettant d’exécuter des requêtes d’agrégations à partir des tables de la zone silver
Contenu du fichier load_bronze_orders.py :
1from pyspark import pipelines as dp
2from pyspark.sql import DataFrame, SparkSession
3from pyspark.sql.functions import current_timestamp
4
5
6PATH_DATA_INPUT="file:///opt/spark/data/input"
7spark = SparkSession.active()
8
9
10# --- Bronze zone (raw loading) ---
11@dp.materialized_view(name="sdp_bronze.raw_orders")
12def raw_orders() -> DataFrame:
13 return spark.read \
14 .format("csv") \
15 .option("header",True) \
16 .option("inferSchema",True) \
17 .load(f"{PATH_DATA_INPUT}/orders.csv") \
18 .withColumn("ts_load_file", current_timestamp())
19
20@dp.materialized_view(name="sdp_bronze.raw_clients")
21def raw_clients() -> DataFrame:
22 return spark.read \
23 .format("csv") \
24 .option("header",True) \
25 .option("inferSchema",True) \
26 .load(f"{PATH_DATA_INPUT}/clients.csv") \
27 .withColumn("ts_load_file", current_timestamp())
28
29@dp.materialized_view(name="sdp_bronze.raw_products")
30def raw_products() -> DataFrame:
31 return spark.read \
32 .format("csv") \
33 .option("header",True) \
34 .option("inferSchema",True) \
35 .load(f"{PATH_DATA_INPUT}/products.csv") \
36 .withColumn("ts_load_file", current_timestamp())
Contenu du fichier load_silver_orders.sql :
1--- Silver Zone (cleaned & typed) ---
2CREATE MATERIALIZED VIEW sdp_silver.refined_orders
3AS
4SELECT int(id_commande)
5 ,int(id_client)
6 ,int(id_product)
7 ,int(nb_quantity)
8 ,to_date(dt_commande) as dt_commande
9 ,to_timestamp(ts_load_file) as ts_load_file
10FROM sdp_bronze.raw_orders
11WHERE id_commande IS NOT NULL
12;
13
14
15CREATE MATERIALIZED VIEW sdp_silver.refined_clients
16AS
17SELECT int(id_client)
18 ,string(lib_client)
19 ,string(lib_city)
20 ,to_timestamp(ts_load_file) as ts_load_file
21FROM sdp_bronze.raw_clients
22WHERE id_client IS NOT NULL
23;
24
25
26CREATE MATERIALIZED VIEW sdp_silver.refined_products
27AS
28SELECT int(id_product)
29 ,string(lib_product)
30 ,double(mnt_prix_unit)
31 ,string(lib_category)
32 ,to_timestamp(ts_load_file) as ts_load_file
33FROM sdp_bronze.raw_products
34WHERE id_product IS NOT NULL
35;
Contenu du fichier load_gold_orders.sql :
1--- Gold Zone (Business analyses) ---
2CREATE MATERIALIZED VIEW sdp_gold.dist_by_client
3AS
4SELECT c.id_client, c.lib_client, ROUND(SUM(o.nb_quantity * p.mnt_prix_unit), 2) as total_expense
5FROM sdp_silver.refined_orders o
6INNER JOIN sdp_silver.refined_clients c
7ON (o.id_client = c.id_client)
8INNER JOIN sdp_silver.refined_products p
9ON (o.id_product = p.id_product)
10GROUP BY c.id_client, c.lib_client
11ORDER BY total_expense DESC
12;
13
14CREATE MATERIALIZED VIEW sdp_gold.dist_by_product
15AS
16SELECT p.id_product, p.lib_product, SUM(o.nb_quantity) as sales_volume
17FROM sdp_silver.refined_orders o
18INNER JOIN sdp_silver.refined_products p
19ON (o.id_product = p.id_product)
20GROUP BY p.id_product, p.lib_product
21ORDER BY sales_volume DESC
22;
Création des éléments pour gérer l’environnement
Afin de pouvoir gérer proprement l’environnement, nous allons créer deux scripts PySpark dans le répertoire application/LoadOrdersData :
- Le script
init_schemas.pypermettant de créer les schémas nécessaires à la création des tables du pipelinessdp_bronze: pour le stockage des tables bronzesdp_silver: pour le stockage des tables silversdp_gold: pour le stockage des tables gold
- Le script
check_list_tables.pypermettant d’afficher l’ensemble des éléments créés
Contenu du script init_schemas.py :
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import col
3
4REMOTE_URL = "sc://localhost:15002"
5
6spark = SparkSession.builder.remote(REMOTE_URL).getOrCreate()
7
8spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_bronze COMMENT 'Schema for SDP Pipelines - Bronze Zone'")
9spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_silver COMMENT 'Schema for SDP Pipelines - Silver Zone'")
10spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_gold COMMENT 'Schema for SDP Pipelines - Gold Zone'")
11
12list_schemas = spark.sql("show schemas;").where(col("namespace").like('sdp_%'))
13
14nb_sdp_schema = list_schemas.count()
15
16if (nb_sdp_schema == 3):
17 print(f"[INFO] The SDP Schemas have been successfully created !")
18else :
19 print(f"[WARN] The SDP Schemas were not created successfully !")
20
21list_schemas.show()
22
23spark.stop()
Contenu du script check_list_tables.py :
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import col
3
4REMOTE_URL = "sc://localhost:15002"
5
6spark = SparkSession.builder.remote(REMOTE_URL).getOrCreate()
7
8list_tables = ['clients','products','orders']
9list_tables_gold = ['dist_by_client','dist_by_product']
10
11print(f"[INFO] Data from SDP_BRONZE :")
12for table in list_tables :
13 tmp_table = f"sdp_bronze.raw_{table}"
14 print(f"- {tmp_table} :")
15 spark.table(tmp_table).show(truncate=False)
16
17print(f"[INFO] Data from SDP_SILVER :")
18for table in list_tables :
19 tmp_table = f"sdp_silver.refined_{table}"
20 print(f"- {tmp_table} :")
21 spark.table(tmp_table).show(truncate=False)
22
23print(f"[INFO] Data from SDP_GOLD :")
24for table in list_tables_gold :
25 tmp_table = f"sdp_gold.{table}"
26 print(f"- {tmp_table} :")
27 spark.table(tmp_table).show(truncate=False)
28
29spark.stop()
Résultat de l’exécution du pipeline créé
Exécution du pipeline
L’ensemble des commandes seront exécutés à partir du répertoire application/LoadOrdersData/
L’ordre d’exécution est le suivant :
- Exécution du script d’initialisation des schémas avec la commande
python init_schemas.py - Vérifier que le message affiché par le script
init_schemas.pyest[INFO] The SDP Schemas have been successfully created ! - Exécution du pipeline avec la commande
spark-pipelines run --spec spark-pipeline.yml - Exécution du script de vérification des objets créés avec la commande
python check_list_tables.py
Résultat de l’exécution
Résultat de l’exécution de la commande python init_schemas.py :
1[INFO] The SDP Schemas have been successfully created !
2+----------+
3| namespace|
4+----------+
5|sdp_bronze|
6| sdp_gold|
7|sdp_silver|
8+----------+
Résultat de l’exécution de la commande spark-pipelines run --spec spark-pipeline.yml :
12026-03-30 19:31:25: Loading pipeline spec from spark-pipeline.yml...
22026-03-30 19:31:25: Creating Spark session...
32026-03-30 19:31:25: Creating dataflow graph...
42026-03-30 19:31:25: Registering graph elements...
52026-03-30 19:31:25: Loading definitions. Root directory: '.../application/LoadOrdersData'.
62026-03-30 19:31:25: Found 3 files matching glob 'transformations/**/*'
72026-03-30 19:31:25: Importing .../application/LoadOrdersData/transformations/load_bronze_orders.py...
82026-03-30 19:31:27: Registering SQL file .../application/LoadOrdersData/transformations/load_gold_orders.sql...
92026-03-30 19:31:28: Registering SQL file .../application/LoadOrdersData/transformations/load_silver_orders.sql...
102026-03-30 19:31:28: Starting run...
112026-03-30 19:31:38: Flow spark_catalog.sdp_gold.dist_by_product is QUEUED.
122026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_orders is QUEUED.
132026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is QUEUED.
142026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_products is QUEUED.
152026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is QUEUED.
162026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is QUEUED.
172026-03-30 19:31:38: Flow spark_catalog.sdp_gold.dist_by_client is QUEUED.
182026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_clients is QUEUED.
192026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is PLANNING.
202026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is STARTING.
212026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is RUNNING.
222026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is PLANNING.
232026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is STARTING.
242026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is RUNNING.
252026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is PLANNING.
262026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is STARTING.
272026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is RUNNING.
282026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_orders has COMPLETED.
292026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_products has COMPLETED.
302026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_clients has COMPLETED.
312026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is PLANNING.
322026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is STARTING.
332026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is RUNNING.
342026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is PLANNING.
352026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is STARTING.
362026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is RUNNING.
372026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is PLANNING.
382026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is STARTING.
392026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is RUNNING.
402026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_clients has COMPLETED.
412026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_products has COMPLETED.
422026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_orders has COMPLETED.
432026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is PLANNING.
442026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is STARTING.
452026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is RUNNING.
462026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is PLANNING.
472026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is STARTING.
482026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is RUNNING.
492026-03-30 19:32:16: Flow spark_catalog.sdp_gold.dist_by_product has COMPLETED.
502026-03-30 19:32:16: Flow spark_catalog.sdp_gold.dist_by_client has COMPLETED.
512026-03-30 19:32:17: Run is COMPLETED.
Résultat de l’exécution de la commande python check_list_tables.py :
1[INFO] Data from SDP_BRONZE :
2- sdp_bronze.raw_clients :
3+---------+----------+---------+--------------------------+
4|id_client|lib_client|lib_city |ts_load_file |
5+---------+----------+---------+--------------------------+
6|1 |Alice |Paris |2026-03-30 19:31:38.862824|
7|2 |Bob |Lyon |2026-03-30 19:31:38.862824|
8|3 |Charlie |Marseille|2026-03-30 19:31:38.862824|
9|4 |David |Lille |2026-03-30 19:31:38.862824|
10|5 |Eve |Bordeaux |2026-03-30 19:31:38.862824|
11+---------+----------+---------+--------------------------+
12
13- sdp_bronze.raw_products :
14+----------+-----------+-------------+------------+--------------------------+
15|id_product|lib_product|mnt_prix_unit|lib_category|ts_load_file |
16+----------+-----------+-------------+------------+--------------------------+
17|101 |Laptop |1200.0 |Electronique|2026-03-30 19:31:38.859625|
18|102 |Souris |25.0 |Accessoires |2026-03-30 19:31:38.859625|
19|103 |Clavier |45.0 |Accessoires |2026-03-30 19:31:38.859625|
20|104 |Ecran |200.0 |Electronique|2026-03-30 19:31:38.859625|
21|105 |Casque |80.0 |Audio |2026-03-30 19:31:38.859625|
22+----------+-----------+-------------+------------+--------------------------+
23
24- sdp_bronze.raw_orders :
25+-----------+---------+----------+-----------+-----------+--------------------------+
26|id_commande|id_client|id_product|nb_quantity|dt_commande|ts_load_file |
27+-----------+---------+----------+-----------+-----------+--------------------------+
28|1 |1 |101 |1 |2025-10-01 |2026-03-30 19:31:38.762104|
29|2 |2 |102 |2 |2025-10-01 |2026-03-30 19:31:38.762104|
30|3 |3 |103 |1 |2025-10-02 |2026-03-30 19:31:38.762104|
31|4 |1 |104 |1 |2025-10-02 |2026-03-30 19:31:38.762104|
32|5 |5 |105 |2 |2025-10-03 |2026-03-30 19:31:38.762104|
33|6 |2 |101 |1 |2025-10-03 |2026-03-30 19:31:38.762104|
34|7 |4 |103 |3 |2025-10-04 |2026-03-30 19:31:38.762104|
35|8 |1 |102 |1 |2025-10-04 |2026-03-30 19:31:38.762104|
36|9 |3 |101 |1 |2025-10-05 |2026-03-30 19:31:38.762104|
37|10 |5 |104 |1 |2025-10-05 |2026-03-30 19:31:38.762104|
38|11 |2 |105 |1 |2025-10-06 |2026-03-30 19:31:38.762104|
39|12 |4 |101 |1 |2025-10-06 |2026-03-30 19:31:38.762104|
40|13 |1 |105 |1 |2025-10-07 |2026-03-30 19:31:38.762104|
41|14 |3 |102 |4 |2025-10-07 |2026-03-30 19:31:38.762104|
42|15 |5 |103 |1 |2025-10-08 |2026-03-30 19:31:38.762104|
43+-----------+---------+----------+-----------+-----------+--------------------------+
44
45[INFO] Data from SDP_SILVER :
46- sdp_silver.refined_clients :
47+---------+----------+---------+--------------------------+
48|id_client|lib_client|lib_city |ts_load_file |
49+---------+----------+---------+--------------------------+
50|1 |Alice |Paris |2026-03-30 19:31:38.862824|
51|2 |Bob |Lyon |2026-03-30 19:31:38.862824|
52|3 |Charlie |Marseille|2026-03-30 19:31:38.862824|
53|4 |David |Lille |2026-03-30 19:31:38.862824|
54|5 |Eve |Bordeaux |2026-03-30 19:31:38.862824|
55+---------+----------+---------+--------------------------+
56
57- sdp_silver.refined_products :
58+----------+-----------+-------------+------------+--------------------------+
59|id_product|lib_product|mnt_prix_unit|lib_category|ts_load_file |
60+----------+-----------+-------------+------------+--------------------------+
61|101 |Laptop |1200.0 |Electronique|2026-03-30 19:31:38.859625|
62|102 |Souris |25.0 |Accessoires |2026-03-30 19:31:38.859625|
63|103 |Clavier |45.0 |Accessoires |2026-03-30 19:31:38.859625|
64|104 |Ecran |200.0 |Electronique|2026-03-30 19:31:38.859625|
65|105 |Casque |80.0 |Audio |2026-03-30 19:31:38.859625|
66+----------+-----------+-------------+------------+--------------------------+
67
68- sdp_silver.refined_orders :
69+-----------+---------+----------+-----------+-----------+--------------------------+
70|id_commande|id_client|id_product|nb_quantity|dt_commande|ts_load_file |
71+-----------+---------+----------+-----------+-----------+--------------------------+
72|1 |1 |101 |1 |2025-10-01 |2026-03-30 19:31:38.762104|
73|2 |2 |102 |2 |2025-10-01 |2026-03-30 19:31:38.762104|
74|3 |3 |103 |1 |2025-10-02 |2026-03-30 19:31:38.762104|
75|4 |1 |104 |1 |2025-10-02 |2026-03-30 19:31:38.762104|
76|5 |5 |105 |2 |2025-10-03 |2026-03-30 19:31:38.762104|
77|6 |2 |101 |1 |2025-10-03 |2026-03-30 19:31:38.762104|
78|7 |4 |103 |3 |2025-10-04 |2026-03-30 19:31:38.762104|
79|8 |1 |102 |1 |2025-10-04 |2026-03-30 19:31:38.762104|
80|9 |3 |101 |1 |2025-10-05 |2026-03-30 19:31:38.762104|
81|10 |5 |104 |1 |2025-10-05 |2026-03-30 19:31:38.762104|
82|11 |2 |105 |1 |2025-10-06 |2026-03-30 19:31:38.762104|
83|12 |4 |101 |1 |2025-10-06 |2026-03-30 19:31:38.762104|
84|13 |1 |105 |1 |2025-10-07 |2026-03-30 19:31:38.762104|
85|14 |3 |102 |4 |2025-10-07 |2026-03-30 19:31:38.762104|
86|15 |5 |103 |1 |2025-10-08 |2026-03-30 19:31:38.762104|
87+-----------+---------+----------+-----------+-----------+--------------------------+
88
89[INFO] Data from SDP_GOLD :
90- sdp_gold.dist_by_client :
91+---------+----------+-------------+
92|id_client|lib_client|total_expense|
93+---------+----------+-------------+
94|1 |Alice |1505.0 |
95|3 |Charlie |1345.0 |
96|4 |David |1335.0 |
97|2 |Bob |1330.0 |
98|5 |Eve |405.0 |
99+---------+----------+-------------+
100
101- sdp_gold.dist_by_product :
102+----------+-----------+------------+
103|id_product|lib_product|sales_volume|
104+----------+-----------+------------+
105|102 |Souris |7 |
106|103 |Clavier |5 |
107|105 |Casque |4 |
108|101 |Laptop |4 |
109|104 |Ecran |2 |
110+----------+-----------+------------+

