Spark : v4.x - Fonctionnalités - Spark Declarative Pipelines

Vous trouverez dans cet article, des informations sur la fonctionnalité Spark Declarative Pipelines (SDP) de Spark v4.1.

Introduction

En Juin 2022, Databricks a annoncé la disponibilité générale d’une nouvelle fonctionnalité nommée Delta Live Tables (DLT) dans son écosystème et permettant le développement de manière déclarative des pipelines de traitements Spark. Cette fonctionnalité se basait principalement sur le format de données Delta Lake et sur le catalogue de données Unity Catalog.

En juin 2025, Lors de l’événement Data+AI Summit 2025, Databricks a annoncé publiquement l’ouverture en open source d’une partie cœur de son framework déclaratif de traitement de données basé sur Apache Spark en excluant, par conséquent, un grand nombre de fonctionnalités et d’optimisations qui seront disponibles uniquement sur la plateforme Databricks.

En Juillet 2025, Databricks a annoncé le renommage de la fonctionnalité Delta Live Tables en Lakeflow Declarative Pipelines qui est devenu le nom officiel de cette fonctionnalité au sein de la plateforme Databricks.

Information

Le framework Spark Declarative Pipelines (SDP) est un nouveau composant d’Apache Spark 4.1 conçu pour que les développeurs se concentrent sur les transformations de données plutôt que sur la gestion explicite des dépendances et de l’exécution des pipelines.

En pratique, le framework SDP permet de décrire ce que les données doivent être, et Spark se charge du comment : ordonnancement des tâches, parallélisme, checkpoints, tentatives. L’API s’appuie sur des décorateurs Python et du SQL, avec un client dédié (spark-pipelines) pour exécuter les pipelines.

Le framework SDP gère l’ordre d’exécution, la résolution des dépendances et le traitement incrémental, ce qui facilite le développement, la maintenance, les tests et la surveillance des pipelines.

Le framework SDP est à l’image d’un outil tel que DBT ou SQLMesh mais il est encore immature et avec un écosystème (fonctionnalités, documentations, optimisations, possibilités, compatibilités, …) très limité.

Le guide de programmation officiel est extrêmement claire et concis pour comprendre les concepts importants.

Les éléments clés :

  • Le fichier YAML de définition d’un pipeline permet de définir les éléments structurants pour le parsing et l’exécution des flows.
  • Il est possible d’utiliser des scripts python (avec des décorateurs) et des scripts SQL.
  • Les trois types d’objets (datasets) utilisables :
    • Streaming Table : Permet de créer une table pour stocker les données traitées avec Spark Streaming
      • Syntaxe Python : @dp.table ou dp.create_streaming_table
      • Syntaxe SQL : CREATE STREAMING TABLE
    • Materialized View : Permet de créer une table pour stocker les données en batch
      • Syntaxe Python : @dp.materialized_view
      • Syntaxe SQL : CREATE MATERIALIZED VIEW
    • Temporary View : Permet de créer une vue temporaire pour les résultats intermédiaires lors de l’exécution du pipeline
      • Syntaxe Python : @dp.temporary_view
      • Syntaxe SQL : CREATE TEMPORARY VIEW

Les commandes spécifiques au framework SDP :

  • spark-pipelines init : Pour initialiser un premier projet
  • spark-pipelines dry-run --spec pipeline_def.yaml : Pour valider un pipeline sans l’exécuter (très utile pour valider les pipelines développés lors de l’exécution d’une pipeline CI/CD)
  • spark-pipelines run --spec pipeline_def.yaml : Pour exécuter le pipeline souhaité

Le fonctionnement générale du framework SDP :

  1. Début de l’exécution.
  2. Lecture du fichier de définition du pipeline.
  3. Création d’une session Spark en s’appuyant sur les éléments de configuration définis dans le fichier de définition du pipeline.
  4. Parsing de l’ensemble des fichiers Python ou SQL listés dans le fichier de définition du pipeline.
  5. Génération du graphe de flux de données (dataflow graph) qui permet au moteur de planifier l’ordre d’exécution des différents éléments constituant le pipeline en fonction des dépendances déclarées.
  6. Exécution des différents flux dans l’ordre défini (parallélisation et séquencement en fonction des dépendances).
  7. Fin de l’exécution.

Documentation :

Attention : Lors des tests, le framework avait beaucoup de mal à définir le graphe de dépendance lorsque le code était entièrement fait avec les décorateur Python. De très nombreuses erreurs était liés au fait que le framework chercher l’existant d’une table dans le catalogue de données plutôt que dans les scripts du pipeline. L’utilisation du code SQL semble beaucoup mieux gérer pour le moment.

Avantages

  1. Résolution automatique des dépendances et optimisation de l’exécution du pipeline : C’est le bénéfice le plus concret. L’ordre de définition des fonctions Python ou des requêtes SQL n’a pas d’importance. Le framework SDP analyse le graphe et détermine l’ordre d’exécution optimal. Le framework SDP va exécuter en parallèle les requêtes qui n’ont pas de dépendance directe puis exécuter séquentiellement celles avec des dépendances.
  2. Permet la validation pré-exécution avec dry-run : Le dry-run détecte les erreurs de syntaxe (Python ou SQL), les erreurs d’analyse (table ou colonne inexistante) et les erreurs de validation du graphe comme les dépendances cycliques sans lire ni écrire aucune donnée. Cela permet d’ajouter un contrôle automatique au niveau des pipelines CI/CD.
  3. Format-agnostique : Le framework SDP supporte toutes les sources de données et tous les catalogues supportés par Spark. Il est recommandé d’utiliser les formats de données Iceberg ou Delta Lake.
  4. Facilite la maintenance : Il est beaucoup plus facile de comprendre et maintenir un pipeline se basant sur un framework déclaratif qu’un ensemble de traitements PySpark impératifs

Limitations

  1. Immaturité de l’écosystème (fonctionnalités et outillages) : Le framework SDP est dans un cycle de développement très actif. Le comportement de certaines fonctionnalités peut changer entre les versions de Spark. Tout n’est pas encore documenté. Il est bien trop tôt pour l’adopter en production. Les orchestrateurs populaires (Apache Airflow, Prefect, Dagster) ne gèrent pas forcément ce framework (ou de façon très limitée).
  2. Les features premium restent chez Databricks : Les fonctionnalités non disponibles dans le framework SDP incluent par exemple les fonctionnalités suivantes :
    1. L’Auto CDC (Change Data Capture)
    2. Les Expectations avec enforcement automatique (@dp.expect, mise en quarantaine automatisée des lignes invalides)
    3. Data lineage visuel et observabilité avancée intégrée
    4. Journal d’événements (event log) requêtable pour l’audit de pipeline
  3. Débogage moins intuitif qu’avec du code impératif : Les stack traces référencent des classes internes du framework SDP et non pas directement les lignes du code Python ou SQL. Ce point est particulièrement critique pour les équipes habituées à déboguer des jobs Spark avec des instructions de type spark.read ou df.show().
  4. Limitation des fonctions et des optimisations : Beaucoup moins de possibilités pour gérer des pipelines et des traitements complexes. Cela peut nécessiter de repasser par du code impératif (exemple : gestion des états pour le streaming, besoin des fonctions non supportés comme la fonction pivot, …).

Codes

Démarche

Pour tester le framework SDP, nous allons passer par les étapes suivantes :

  1. Création d’un cluster Spark v4.1.1 en local
  2. Création d’un jeu de données limité
  3. Création d’un pipeline dans un projet avec le framework SDP
  4. Exécution du pipeline créé

Les répertoires utilisés pour ce projet sont :

  • application : Répertoire pour stocker l’ensemble des scripts
  • data : Répertoire pour stocker les jeux de données (en entrée et en sortie)
  • logs : Répertoire pour stocker les logs d’exécutions du cluster Spark
  • spark-image-docker : Répertoire pour stocker les fichiers nécessaires à la création de l’image Docker

Cluster Spark

Pour pouvoir tester le framework SDP avec la version 4.1.1 de Spark, nous allons créer une image Docker et les éléments nécessaires pour avoir un cluster local composé d’un nœud Master (driver) et de deux nœuds Workers.

Création d’une image Docker

Étapes à réaliser à partir du répertoire spark-image-docker :

  1. Création du fichier spark-defaults.conf permettant de définir les éléments de configuration pour le cluster Spark.
  2. Création du fichier spark-env.sh permettant de définir les variables d’environnement pour le cluster Spark.
  3. Création du fichier spark-start.sh permettant de définir le script d’exécution des services du cluster Spark.
  4. Création du fichier Dockerfile permettant de définir le contenu de l’image Spark pour la création du cluster Spark.
  5. Exécution de la commande de création de l’image Docker : docker build -t spark4 spark-image-docker --no-cache.

Contenu du fichier spark-defaults.conf :

 1# --- Force JARs into Classpath ---
 2spark.driver.extraClassPath      /opt/spark/jars/*
 3spark.executor.extraClassPath    /opt/spark/jars/*
 4
 5# --- History Server Configuration ---
 6spark.eventLog.enabled              true
 7spark.eventLog.dir                  file:///opt/spark/event_logs
 8spark.history.fs.logDirectory       file:///opt/spark/event_logs
 9
10# --- Delta Configuration ---
11spark.connect.extensions.relation.classes   org.apache.spark.sql.connect.delta.DeltaRelationPlugin
12spark.connect.extensions.command.classes    org.apache.spark.sql.connect.delta.DeltaCommandPlugin
13spark.sql.extensions                        io.delta.sql.DeltaSparkSessionExtension
14
15# --- Database Configurations ---
16spark.sql.catalog.spark_catalog             org.apache.spark.sql.delta.catalog.DeltaCatalog
17
18
19# --- Default Catalog configuration ---
20spark.sql.defaultCatalog                spark_catalog
21
22# --- Default warehouse local path configuration ---
23spark.sql.warehouse.dir              file:///opt/spark/data/warehouse

Contenu du fichier spark-env.sh :

1#!/bin/env bash
2
3export SPARK_LOCAL_IP=`hostname -i`
4export SPARK_PUBLIC_DNS=`hostname -f`

Contenu du fichier spark-start.sh :

 1#!/bin/bash
 2
 3# Start the SSH daemon
 4/usr/sbin/sshd
 5if [ $? -ne 0 ]; then
 6    echo "Failed to start SSH server. Exiting."
 7    exit 1
 8fi
 9
10if [ "$SPARK_MODE" = "master" ]; then
11    echo "Starting Spark Master..."
12    # Spark Master/Driver
13    $SPARK_HOME/sbin/start-master.sh 
14    # Spark Connect
15    $SPARK_HOME/sbin/start-connect-server.sh 
16    # History server
17    $SPARK_HOME/sbin/start-history-server.sh 
18elif [ "$SPARK_MODE" = "worker" ]; then
19    echo "Starting Spark Worker..."
20    # Spark Worker
21    $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER_URL
22else
23    echo "Unknown SPARK_MODE: $SPARK_MODE"
24    exit 1
25fi
26
27# Keep the container alive
28tail -f $SPARK_HOME/logs/*

Contenu du fichier Dockerfile :

 1# Use OpenJDK base image
 2FROM eclipse-temurin:21-jdk-jammy
 3
 4
 5# Define env variables
 6ENV SPARK_MASTER="spark://spark-master:7077"
 7ENV SPARK_MASTER_HOST=spark-master
 8ENV SPARK_MASTER_PORT=7077
 9ENV PYSPARK_PYTHON=python3
10ENV SPARK_HOME=/opt/spark
11ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
12ENV DELTA_VERSION="4.1.0"
13ENV SPARK_VERSION="4.1.1"
14ENV SCALA_VERSION="2.13"
15
16
17# Install tools
18RUN apt-get update \
19    && apt-get install -y --no-install-recommends wget tar iputils-ping rsync openssh-server openssh-client \
20    && apt-get install -y --no-install-recommends python3 python3-pip \
21    && rm -rf /var/lib/apt/lists/*
22
23# Manage SSH informations
24RUN mkdir -p /root/.ssh/ \
25    && ssh-keygen -t rsa -f /root/.ssh/id_rsa -q -N "" \
26    && cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys \
27    && chmod 600 /root/.ssh/authorized_keys \
28    && echo "Host *" >> /root/.ssh/config \
29    && echo "    StrictHostKeyChecking no" >> /root/.ssh/config \
30    && chmod 600 /root/.ssh/config \
31    && mkdir -p /var/run/sshd \
32    && ssh-keygen -A
33
34# Install Spark
35RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz
36RUN tar -xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz \
37    && rm spark-${SPARK_VERSION}-bin-hadoop3.tgz \
38    && mv spark-${SPARK_VERSION}-bin-hadoop3 ${SPARK_HOME} \ 
39    && chown -R root:root ${SPARK_HOME} \
40    && mkdir -p ${SPARK_HOME}/logs \
41    && mkdir -p ${SPARK_HOME}/event_logs
42
43# Download the Delta Spark JAR directly into Spark's main jars directory
44# Spark automatically loads all JARs from this folder on startup.
45RUN wget https://repo1.maven.org/maven2/io/delta/delta-spark_${SCALA_VERSION}/${DELTA_VERSION}/delta-spark_${SCALA_VERSION}-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
46RUN wget https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_VERSION}/delta-storage-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
47RUN wget https://repo1.maven.org/maven2/io/delta/delta-connect-client_${SCALA_VERSION}/${DELTA_VERSION}/delta-connect-client_${SCALA_VERSION}-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
48
49# Set up Spark configuration for logging and history server
50COPY spark-defaults.conf $SPARK_HOME/conf/spark-defaults.conf
51
52# Set up Spark configuration scripts
53COPY spark-env.sh $SPARK_HOME/conf/spark-env.sh
54COPY spark-start.sh $SPARK_HOME/spark-start.sh
55RUN chmod +x $SPARK_HOME/conf/spark-env.sh
56RUN chmod +x $SPARK_HOME/spark-start.sh
57
58# Expose needed ports
59EXPOSE 7077 8080 4040 15002 22
60
61# Entrypoint config
62CMD ["/opt/spark/spark-start.sh"]

Création du fichier Docker Compose

Étapes à réaliser à partir du répertoire racine :

  1. Création du fichier compose.yml permettant de définir les différents éléments du cluster qui sera composé d’un nœud Master et de deux nœuds Workers
  2. Démarrer le cluster local avec la commande docker-compose up -d
  3. Arrêter le cluster local avec la commande docker-compose down

Contenu du fichier compose.yml :

  1services:
  2
  3  spark-master:
  4    image: spark4
  5    container_name: spark-master
  6    hostname: spark-master
  7    environment:
  8      - SPARK_MODE=master
  9      - SPARK_RPC_AUTHENTICATION_ENABLED=false
 10      - SPARK_RPC_ENCRYPTION_ENABLED=false
 11      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=false
 12      - SPARK_SSL_ENABLED=false
 13      - SPARK_PUBLIC_DNS=spark-master
 14      - SPARK_MASTER_HOST=spark-master
 15      - SPARK_MASTER_PORT=7077
 16      - SPARK_DRIVER_MEMORY=1g
 17      - SPARK_DRIVER_CORES=1
 18      - SPARK_EXECUTOR_MEMORY=1g
 19      - SPARK_MASTER_WEBUI_PORT=8080
 20    ports:
 21      - "4040:4040"   # Application UI (Job Details)
 22      - "8080:8080"   # Interface web du master
 23      - "7077:7077"   # Port de communication Spark
 24      - "15002:15002" # Port Spark Connect
 25      - "18080:18080" # Interface History Server
 26    deploy:
 27      resources:
 28        limits:
 29          cpus: '1'
 30          memory: 1G
 31    volumes:
 32      - ./data:/opt/spark/data
 33      - ./logs/events:/opt/spark/event_logs
 34      - ./application/src:/home/root/src
 35    networks:
 36      - spark-network
 37
 38
 39  spark-worker-1:
 40    image: spark4
 41    container_name: spark-worker-1
 42    hostname: spark-worker-1
 43    depends_on:
 44      - spark-master
 45    environment:
 46      - SPARK_MODE=worker
 47      - SPARK_MASTER_URL=spark://spark-master:7077
 48      - SPARK_RPC_AUTHENTICATE=false
 49      - SPARK_RPC_ENCRYPTION=false
 50      - SPARK_LOCAL_STORAGE_ENCRYPTION=false
 51      - SPARK_SSL_ENABLED=no
 52      - SPARK_PUBLIC_DNS=spark-worker-1
 53      - SPARK_MASTER_HOST=spark-master
 54      - SPARK_MASTER_PORT=7077
 55      - SPARK_WORKER_CORES=2
 56      - SPARK_WORKER_MEMORY=2g
 57      - SPARK_EXECUTOR_MEMORY=1g
 58      - SPARK_WORKER_WEBUI_PORT=8081
 59    ports:
 60      - "8081:8081" # Interface web (worker)
 61    volumes:
 62      - ./data:/opt/spark/data
 63    networks:
 64      - spark-network
 65    deploy:
 66      resources:
 67        limits:
 68          cpus: '2'
 69          memory: 2G
 70
 71
 72  spark-worker-2:
 73    image: spark4
 74    container_name: spark-worker-2
 75    hostname: spark-worker-2
 76    depends_on:
 77      - spark-master
 78    environment:
 79      - SPARK_MODE=worker
 80      - SPARK_MASTER_URL=spark://spark-master:7077
 81      - SPARK_RPC_AUTHENTICATE=false
 82      - SPARK_RPC_ENCRYPTION=false
 83      - SPARK_LOCAL_STORAGE_ENCRYPTION=false
 84      - SPARK_SSL_ENABLED=no
 85      - SPARK_PUBLIC_DNS=spark-worker-2
 86      - SPARK_MASTER_HOST=spark-master
 87      - SPARK_MASTER_PORT=7077
 88      - SPARK_WORKER_CORES=2
 89      - SPARK_WORKER_MEMORY=2g
 90      - SPARK_EXECUTOR_MEMORY=1g
 91      - SPARK_WORKER_WEBUI_PORT=8081
 92    ports:
 93      - "8082:8081" # Interface web (worker)
 94    volumes:
 95      - ./data:/opt/spark/data
 96    networks:
 97      - spark-network
 98    deploy:
 99      resources:
100        limits:
101          cpus: '2'
102          memory: 2G
103
104
105networks:
106  spark-network:
107    driver: bridge

Contenu du fichier de log après exécution de la commande docker-compose up -d :

1[+] up 4/4
2 ✔ Network spark4_spark-network Created
3 ✔ Container spark-master       Created
4 ✔ Container spark-worker-1     Created
5 ✔ Container spark-worker-2     Created

Contenu du fichier de log après exécution de la commande docker-compose down :

1[+] down 4/4
2 ✔ Container spark-worker-2     Removed
3 ✔ Container spark-worker-1     Removed
4 ✔ Container spark-master       Removed
5 ✔ Network spark4_spark-network Removed

Liste des ports et des interfaces

En se basant sur la configuration définie dans le fichier compose.yml :

Jeu de données

Dans le répertoire data :

  1. Création d’un répertoire input : mkdir -p data/input
  2. Création du fichier clients.csv pour stocker quelques lignes définissants des clients
  3. Création du fichier products.csv pour stocker quelques lignes définissant les produits
  4. Création du fichier orders.csv pour stocker quelques lignes définissant des événements d’achats des produits par les clients

Contenu du fichier clients.csv :

1id_client,lib_client,lib_city
21,Alice,Paris
32,Bob,Lyon
43,Charlie,Marseille
54,David,Lille
65,Eve,Bordeaux

Contenu du fichier products.csv :

1id_product,lib_product,mnt_prix_unit,lib_category
2101,Laptop,1200.0,Electronique
3102,Souris,25.0,Accessoires
4103,Clavier,45.0,Accessoires
5104,Ecran,200.0,Electronique
6105,Casque,80.0,Audio

Contenu du fichier orders.csv :

 1id_commande,id_client,id_product,nb_quantity,dt_commande
 21,1,101,1,2025-10-01
 32,2,102,2,2025-10-01
 43,3,103,1,2025-10-02
 54,1,104,1,2025-10-02
 65,5,105,2,2025-10-03
 76,2,101,1,2025-10-03
 87,4,103,3,2025-10-04
 98,1,102,1,2025-10-04
109,3,101,1,2025-10-05
1110,5,104,1,2025-10-05
1211,2,105,1,2025-10-06
1312,4,101,1,2025-10-06
1413,1,105,1,2025-10-07
1514,3,102,4,2025-10-07
1615,5,103,1,2025-10-08

Création d’un pipeline dans un projet SDP

Les étapes sont les suivantes :

  1. Installation du framework SDP en local
  2. Initialisation d’un projet SDP template en utilisant la commande spark-pipelines init
  3. Création des éléments nécessaires au pipeline

Installation du framework SDP en local

L’installation des éléments nécessaires pour pouvoir tester le framework SDP se fait avec la commande pip suivante : pip install pyspark[pipelines].

Suite à certain problème de compatibilité des dépendances sur mon environnement de travail, j’ai utilisé la commande suivante pour installer les éléments nécessaires avec les versions compatibles correspondantes : pip install pyspark==4.1.1 pyspark[pipelines] protobuf==6.33.0

Initialisation d’un projet SDP

Afin de pouvoir initialiser un projet SDP, le plus simple est de se positionner dans le répertoire application et d’exécuter la commande suivante spark-pipelines init --name LoadOrdersData.

Cela aura pour effet de créer un répertoire nommé LoadOrdersData dans le répertoire application ainsi que l’arborescence suivante :

  • pipeline-storage/ : Répertoire permettant de stocker les checkpoints lors de l’utilisation de Spark Streaming
  • transformations/ : Répertoire permettant de stocker l’ensemble des scripts Python et SQL définissant les pipelines
  • spark-pipeline.yml : Fichier de configuration du pipeline

Création des éléments nécessaire au pipeline

L’objectif est de créer le pipeline suivante :

graph TD;
    A(clients.csv) --load_bronze_orders.py--> D(sdp_bronze.raw_clients)
    B(products.csv) --load_bronze_orders.py--> E(sdp_bronze.raw_products)
    C(orders.csv) --load_bronze_orders.py--> F(sdp_bronze.raw_orders)
    D(sdp_bronze.raw_clients) --load_silver_orders.sql--> G(sdp_silver.refined_clients)
    E(sdp_bronze.raw_products) --load_silver_orders.sql--> H(sdp_silver.refined_products)
    F(sdp_bronze.raw_orders) --load_silver_orders.sql--> I(sdp_silver.refined_orders)
    H(sdp_silver.refined_products) --load_gold_orders.sql--> J(sdp_gold.dist_by_product)
    I(sdp_silver.refined_orders) --load_gold_orders.sql--> J(sdp_gold.dist_by_product)
    G(sdp_silver.refined_clients) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
    H(sdp_silver.refined_products) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
    I(sdp_silver.refined_orders) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)

Schema : schema_01

Les étapes sont les suivantes :

  • Création du script python load_bronze_orders.py permettant de définir les tables pour stocker les données des fichiers CSV
  • Création du script SQL load_silver_orders.sql permettant de raffiner les tables de la zone bronze dans la zone silver
  • Création du script SQL load_gold_orders.sql permettant d’exécuter des requêtes d’agrégations à partir des tables de la zone silver

Contenu du fichier load_bronze_orders.py :

 1from pyspark import pipelines as dp
 2from pyspark.sql import DataFrame, SparkSession
 3from pyspark.sql.functions import current_timestamp
 4
 5
 6PATH_DATA_INPUT="file:///opt/spark/data/input"
 7spark = SparkSession.active()
 8
 9
10# --- Bronze zone (raw loading) ---
11@dp.materialized_view(name="sdp_bronze.raw_orders")
12def raw_orders() -> DataFrame:
13    return spark.read \
14                .format("csv") \
15                .option("header",True) \
16                .option("inferSchema",True) \
17                .load(f"{PATH_DATA_INPUT}/orders.csv") \
18                .withColumn("ts_load_file", current_timestamp())
19
20@dp.materialized_view(name="sdp_bronze.raw_clients")
21def raw_clients() -> DataFrame:
22    return spark.read \
23                .format("csv") \
24                .option("header",True) \
25                .option("inferSchema",True) \
26                .load(f"{PATH_DATA_INPUT}/clients.csv") \
27                .withColumn("ts_load_file", current_timestamp())
28
29@dp.materialized_view(name="sdp_bronze.raw_products")
30def raw_products() -> DataFrame:
31    return spark.read \
32                .format("csv") \
33                .option("header",True) \
34                .option("inferSchema",True) \
35                .load(f"{PATH_DATA_INPUT}/products.csv") \
36                .withColumn("ts_load_file", current_timestamp())

Contenu du fichier load_silver_orders.sql :

 1--- Silver Zone (cleaned & typed) ---
 2CREATE MATERIALIZED VIEW sdp_silver.refined_orders
 3AS
 4SELECT int(id_commande) 
 5    ,int(id_client) 
 6    ,int(id_product)
 7    ,int(nb_quantity)
 8    ,to_date(dt_commande) as dt_commande
 9    ,to_timestamp(ts_load_file) as ts_load_file
10FROM sdp_bronze.raw_orders 
11WHERE id_commande IS NOT NULL
12;
13
14
15CREATE MATERIALIZED VIEW sdp_silver.refined_clients
16AS
17SELECT int(id_client)
18    ,string(lib_client) 
19    ,string(lib_city)
20    ,to_timestamp(ts_load_file) as ts_load_file
21FROM sdp_bronze.raw_clients 
22WHERE id_client IS NOT NULL
23;
24
25
26CREATE MATERIALIZED VIEW sdp_silver.refined_products
27AS
28SELECT int(id_product)
29    ,string(lib_product)
30    ,double(mnt_prix_unit)
31    ,string(lib_category)
32    ,to_timestamp(ts_load_file) as ts_load_file
33FROM sdp_bronze.raw_products 
34WHERE id_product IS NOT NULL
35;

Contenu du fichier load_gold_orders.sql :

 1--- Gold Zone (Business analyses) ---
 2CREATE MATERIALIZED VIEW sdp_gold.dist_by_client
 3AS
 4SELECT c.id_client, c.lib_client, ROUND(SUM(o.nb_quantity * p.mnt_prix_unit), 2) as total_expense
 5FROM sdp_silver.refined_orders o
 6INNER JOIN sdp_silver.refined_clients c 
 7ON (o.id_client = c.id_client)
 8INNER JOIN sdp_silver.refined_products p 
 9ON (o.id_product = p.id_product)
10GROUP BY c.id_client, c.lib_client
11ORDER BY total_expense DESC
12;
13
14CREATE MATERIALIZED VIEW sdp_gold.dist_by_product
15AS 
16SELECT p.id_product, p.lib_product, SUM(o.nb_quantity) as sales_volume
17FROM sdp_silver.refined_orders o
18INNER JOIN sdp_silver.refined_products p 
19ON (o.id_product = p.id_product)
20GROUP BY p.id_product, p.lib_product
21ORDER BY sales_volume DESC
22;

Création des éléments pour gérer l’environnement

Afin de pouvoir gérer proprement l’environnement, nous allons créer deux scripts PySpark dans le répertoire application/LoadOrdersData :

  • Le script init_schemas.py permettant de créer les schémas nécessaires à la création des tables du pipelines
    • sdp_bronze: pour le stockage des tables bronze
    • sdp_silver: pour le stockage des tables silver
    • sdp_gold: pour le stockage des tables gold
  • Le script check_list_tables.py permettant d’afficher l’ensemble des éléments créés

Contenu du script init_schemas.py :

 1from pyspark.sql import SparkSession
 2from pyspark.sql.functions import col
 3
 4REMOTE_URL = "sc://localhost:15002"
 5
 6spark = SparkSession.builder.remote(REMOTE_URL).getOrCreate()
 7
 8spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_bronze COMMENT 'Schema for SDP Pipelines - Bronze Zone'")
 9spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_silver COMMENT 'Schema for SDP Pipelines - Silver Zone'")
10spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_gold COMMENT 'Schema for SDP Pipelines - Gold Zone'")
11
12list_schemas = spark.sql("show schemas;").where(col("namespace").like('sdp_%'))
13
14nb_sdp_schema = list_schemas.count()
15
16if (nb_sdp_schema == 3):
17    print(f"[INFO] The SDP Schemas have been successfully created !")
18else :
19    print(f"[WARN] The SDP Schemas were not created successfully !")
20
21list_schemas.show()
22
23spark.stop()

Contenu du script check_list_tables.py :

 1from pyspark.sql import SparkSession
 2from pyspark.sql.functions import col
 3
 4REMOTE_URL = "sc://localhost:15002"
 5
 6spark = SparkSession.builder.remote(REMOTE_URL).getOrCreate()
 7
 8list_tables = ['clients','products','orders']
 9list_tables_gold = ['dist_by_client','dist_by_product']
10
11print(f"[INFO] Data from SDP_BRONZE :")
12for table in list_tables :
13    tmp_table = f"sdp_bronze.raw_{table}"
14    print(f"- {tmp_table} :")
15    spark.table(tmp_table).show(truncate=False)
16
17print(f"[INFO] Data from SDP_SILVER :")
18for table in list_tables :
19    tmp_table = f"sdp_silver.refined_{table}"
20    print(f"- {tmp_table} :")
21    spark.table(tmp_table).show(truncate=False)
22
23print(f"[INFO] Data from SDP_GOLD :")
24for table in list_tables_gold :
25    tmp_table = f"sdp_gold.{table}"
26    print(f"- {tmp_table} :")
27    spark.table(tmp_table).show(truncate=False)
28
29spark.stop()

Résultat de l’exécution du pipeline créé

Exécution du pipeline

L’ensemble des commandes seront exécutés à partir du répertoire application/LoadOrdersData/

L’ordre d’exécution est le suivant :

  1. Exécution du script d’initialisation des schémas avec la commande python init_schemas.py
  2. Vérifier que le message affiché par le script init_schemas.py est [INFO] The SDP Schemas have been successfully created !
  3. Exécution du pipeline avec la commande spark-pipelines run --spec spark-pipeline.yml
  4. Exécution du script de vérification des objets créés avec la commande python check_list_tables.py

Résultat de l’exécution

Résultat de l’exécution de la commande python init_schemas.py :

1[INFO] The SDP Schemas have been successfully created !
2+----------+
3| namespace|
4+----------+
5|sdp_bronze|
6|  sdp_gold|
7|sdp_silver|
8+----------+

Résultat de l’exécution de la commande spark-pipelines run --spec spark-pipeline.yml :

 12026-03-30 19:31:25: Loading pipeline spec from spark-pipeline.yml...
 22026-03-30 19:31:25: Creating Spark session...
 32026-03-30 19:31:25: Creating dataflow graph...
 42026-03-30 19:31:25: Registering graph elements...
 52026-03-30 19:31:25: Loading definitions. Root directory: '.../application/LoadOrdersData'.
 62026-03-30 19:31:25: Found 3 files matching glob 'transformations/**/*'
 72026-03-30 19:31:25: Importing .../application/LoadOrdersData/transformations/load_bronze_orders.py...
 82026-03-30 19:31:27: Registering SQL file .../application/LoadOrdersData/transformations/load_gold_orders.sql...
 92026-03-30 19:31:28: Registering SQL file .../application/LoadOrdersData/transformations/load_silver_orders.sql...
102026-03-30 19:31:28: Starting run...
112026-03-30 19:31:38: Flow spark_catalog.sdp_gold.dist_by_product is QUEUED.
122026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_orders is QUEUED.
132026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is QUEUED.
142026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_products is QUEUED.
152026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is QUEUED.
162026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is QUEUED.
172026-03-30 19:31:38: Flow spark_catalog.sdp_gold.dist_by_client is QUEUED.
182026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_clients is QUEUED.
192026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is PLANNING.
202026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is STARTING.
212026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is RUNNING.
222026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is PLANNING.
232026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is STARTING.
242026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is RUNNING.
252026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is PLANNING.
262026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is STARTING.
272026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is RUNNING.
282026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_orders has COMPLETED.
292026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_products has COMPLETED.
302026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_clients has COMPLETED.
312026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is PLANNING.
322026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is STARTING.
332026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is RUNNING.
342026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is PLANNING.
352026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is STARTING.
362026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is RUNNING.
372026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is PLANNING.
382026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is STARTING.
392026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is RUNNING.
402026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_clients has COMPLETED.
412026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_products has COMPLETED.
422026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_orders has COMPLETED.
432026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is PLANNING.
442026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is STARTING.
452026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is RUNNING.
462026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is PLANNING.
472026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is STARTING.
482026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is RUNNING.
492026-03-30 19:32:16: Flow spark_catalog.sdp_gold.dist_by_product has COMPLETED.
502026-03-30 19:32:16: Flow spark_catalog.sdp_gold.dist_by_client has COMPLETED.
512026-03-30 19:32:17: Run is COMPLETED.

Résultat de l’exécution de la commande python check_list_tables.py :

  1[INFO] Data from SDP_BRONZE :
  2- sdp_bronze.raw_clients :
  3+---------+----------+---------+--------------------------+
  4|id_client|lib_client|lib_city |ts_load_file              |
  5+---------+----------+---------+--------------------------+
  6|1        |Alice     |Paris    |2026-03-30 19:31:38.862824|
  7|2        |Bob       |Lyon     |2026-03-30 19:31:38.862824|
  8|3        |Charlie   |Marseille|2026-03-30 19:31:38.862824|
  9|4        |David     |Lille    |2026-03-30 19:31:38.862824|
 10|5        |Eve       |Bordeaux |2026-03-30 19:31:38.862824|
 11+---------+----------+---------+--------------------------+
 12
 13- sdp_bronze.raw_products :
 14+----------+-----------+-------------+------------+--------------------------+
 15|id_product|lib_product|mnt_prix_unit|lib_category|ts_load_file              |
 16+----------+-----------+-------------+------------+--------------------------+
 17|101       |Laptop     |1200.0       |Electronique|2026-03-30 19:31:38.859625|
 18|102       |Souris     |25.0         |Accessoires |2026-03-30 19:31:38.859625|
 19|103       |Clavier    |45.0         |Accessoires |2026-03-30 19:31:38.859625|
 20|104       |Ecran      |200.0        |Electronique|2026-03-30 19:31:38.859625|
 21|105       |Casque     |80.0         |Audio       |2026-03-30 19:31:38.859625|
 22+----------+-----------+-------------+------------+--------------------------+
 23
 24- sdp_bronze.raw_orders :
 25+-----------+---------+----------+-----------+-----------+--------------------------+
 26|id_commande|id_client|id_product|nb_quantity|dt_commande|ts_load_file              |
 27+-----------+---------+----------+-----------+-----------+--------------------------+
 28|1          |1        |101       |1          |2025-10-01 |2026-03-30 19:31:38.762104|
 29|2          |2        |102       |2          |2025-10-01 |2026-03-30 19:31:38.762104|
 30|3          |3        |103       |1          |2025-10-02 |2026-03-30 19:31:38.762104|
 31|4          |1        |104       |1          |2025-10-02 |2026-03-30 19:31:38.762104|
 32|5          |5        |105       |2          |2025-10-03 |2026-03-30 19:31:38.762104|
 33|6          |2        |101       |1          |2025-10-03 |2026-03-30 19:31:38.762104|
 34|7          |4        |103       |3          |2025-10-04 |2026-03-30 19:31:38.762104|
 35|8          |1        |102       |1          |2025-10-04 |2026-03-30 19:31:38.762104|
 36|9          |3        |101       |1          |2025-10-05 |2026-03-30 19:31:38.762104|
 37|10         |5        |104       |1          |2025-10-05 |2026-03-30 19:31:38.762104|
 38|11         |2        |105       |1          |2025-10-06 |2026-03-30 19:31:38.762104|
 39|12         |4        |101       |1          |2025-10-06 |2026-03-30 19:31:38.762104|
 40|13         |1        |105       |1          |2025-10-07 |2026-03-30 19:31:38.762104|
 41|14         |3        |102       |4          |2025-10-07 |2026-03-30 19:31:38.762104|
 42|15         |5        |103       |1          |2025-10-08 |2026-03-30 19:31:38.762104|
 43+-----------+---------+----------+-----------+-----------+--------------------------+
 44
 45[INFO] Data from SDP_SILVER :
 46- sdp_silver.refined_clients :
 47+---------+----------+---------+--------------------------+
 48|id_client|lib_client|lib_city |ts_load_file              |
 49+---------+----------+---------+--------------------------+
 50|1        |Alice     |Paris    |2026-03-30 19:31:38.862824|
 51|2        |Bob       |Lyon     |2026-03-30 19:31:38.862824|
 52|3        |Charlie   |Marseille|2026-03-30 19:31:38.862824|
 53|4        |David     |Lille    |2026-03-30 19:31:38.862824|
 54|5        |Eve       |Bordeaux |2026-03-30 19:31:38.862824|
 55+---------+----------+---------+--------------------------+
 56
 57- sdp_silver.refined_products :
 58+----------+-----------+-------------+------------+--------------------------+
 59|id_product|lib_product|mnt_prix_unit|lib_category|ts_load_file              |
 60+----------+-----------+-------------+------------+--------------------------+
 61|101       |Laptop     |1200.0       |Electronique|2026-03-30 19:31:38.859625|
 62|102       |Souris     |25.0         |Accessoires |2026-03-30 19:31:38.859625|
 63|103       |Clavier    |45.0         |Accessoires |2026-03-30 19:31:38.859625|
 64|104       |Ecran      |200.0        |Electronique|2026-03-30 19:31:38.859625|
 65|105       |Casque     |80.0         |Audio       |2026-03-30 19:31:38.859625|
 66+----------+-----------+-------------+------------+--------------------------+
 67
 68- sdp_silver.refined_orders :
 69+-----------+---------+----------+-----------+-----------+--------------------------+
 70|id_commande|id_client|id_product|nb_quantity|dt_commande|ts_load_file              |
 71+-----------+---------+----------+-----------+-----------+--------------------------+
 72|1          |1        |101       |1          |2025-10-01 |2026-03-30 19:31:38.762104|
 73|2          |2        |102       |2          |2025-10-01 |2026-03-30 19:31:38.762104|
 74|3          |3        |103       |1          |2025-10-02 |2026-03-30 19:31:38.762104|
 75|4          |1        |104       |1          |2025-10-02 |2026-03-30 19:31:38.762104|
 76|5          |5        |105       |2          |2025-10-03 |2026-03-30 19:31:38.762104|
 77|6          |2        |101       |1          |2025-10-03 |2026-03-30 19:31:38.762104|
 78|7          |4        |103       |3          |2025-10-04 |2026-03-30 19:31:38.762104|
 79|8          |1        |102       |1          |2025-10-04 |2026-03-30 19:31:38.762104|
 80|9          |3        |101       |1          |2025-10-05 |2026-03-30 19:31:38.762104|
 81|10         |5        |104       |1          |2025-10-05 |2026-03-30 19:31:38.762104|
 82|11         |2        |105       |1          |2025-10-06 |2026-03-30 19:31:38.762104|
 83|12         |4        |101       |1          |2025-10-06 |2026-03-30 19:31:38.762104|
 84|13         |1        |105       |1          |2025-10-07 |2026-03-30 19:31:38.762104|
 85|14         |3        |102       |4          |2025-10-07 |2026-03-30 19:31:38.762104|
 86|15         |5        |103       |1          |2025-10-08 |2026-03-30 19:31:38.762104|
 87+-----------+---------+----------+-----------+-----------+--------------------------+
 88
 89[INFO] Data from SDP_GOLD :
 90- sdp_gold.dist_by_client :
 91+---------+----------+-------------+
 92|id_client|lib_client|total_expense|
 93+---------+----------+-------------+
 94|1        |Alice     |1505.0       |
 95|3        |Charlie   |1345.0       |
 96|4        |David     |1335.0       |
 97|2        |Bob       |1330.0       |
 98|5        |Eve       |405.0        |
 99+---------+----------+-------------+
100
101- sdp_gold.dist_by_product :
102+----------+-----------+------------+
103|id_product|lib_product|sales_volume|
104+----------+-----------+------------+
105|102       |Souris     |7           |
106|103       |Clavier    |5           |
107|105       |Casque     |4           |
108|101       |Laptop     |4           |
109|104       |Ecran      |2           |
110+----------+-----------+------------+