Spark : v4.x - Features - Spark Declarative Pipelines
In this article, you will find information about the Spark Declarative Pipelines (SDP) feature in Spark v4.1.
Introduction
In June 2022, Databricks announced the general availability of a new feature named Delta Live Tables (DLT) within its ecosystem, enabling the declarative development of Spark processing pipelines. This feature was primarily based on the Delta Lake data format and the Unity Catalog data catalog.
In June 2025, during the Data+AI Summit 2025 event, Databricks publicly announced the open-sourcing of a core part of its declarative data processing framework based on Apache Spark. Consequently, it excluded a large number of features and optimizations that will remain exclusively available on the Databricks platform.
In July 2025, Databricks announced the renaming of the Delta Live Tables feature to Lakeflow Declarative Pipelines, which became the official name of this feature within the Databricks platform.
Information
Spark Declarative Pipelines (SDP) is a new component of Apache Spark 4.1 designed to allow developers to focus on data transformations rather than the explicit management of dependencies and pipeline execution. In practice, SDP framework allows you to describe what the data should look like, and Spark handles the “how”: task scheduling, parallelism, checkpoints, and retries.
The API relies on Python decorators and SQL, with a dedicated client (spark-pipelines) to execute the pipelines.
SDP framework manages execution order, dependency resolution, and incremental processing, which simplifies pipeline development, maintenance, testing, and monitoring.
The framework is similar to tools like DBT or SQLMesh, but it is still immature and has a very limited ecosystem (features, documentation, optimizations, possibilities, compatibilities, etc.).
The official programming guide is extremely clear and concise for understanding the key concepts.
Key elements :
- The pipeline definition YAML file defines the structural elements for parsing and executing flows.
- You can use Python scripts (with decorators) and SQL scripts.
- Three types of usable
datasets:- Streaming Table : Used to create a table to store data processed with Spark Streaming.
- Python syntax :
@dp.tableordp.create_streaming_table - SQL syntax :
CREATE STREAMING TABLE
- Python syntax :
- Materialized View : Used to create a table to store batch data.
- Python syntax :
@dp.materialized_view - SQL syntax :
CREATE MATERIALIZED VIEW
- Python syntax :
- Temporary View : Used to create a temporary view for intermediate results during pipeline execution.
- Python syntax :
@dp.temporary_view - SQL syntax :
CREATE TEMPORARY VIEW
- Python syntax :
- Streaming Table : Used to create a table to store data processed with Spark Streaming.
Specific SDP framework commands :
spark-pipelines init: To initialize a new project.spark-pipelines dry-run --spec pipeline_def.yaml: To validate a pipeline without executing it (very useful for validating developments with CI/CD pipelines).spark-pipelines run --spec pipeline_def.yaml: To execute the desired pipeline.
General operation of the SDP framework :
- Execution start.
- Reading of the pipeline definition file.
- Creation of a Spark session based on the configuration elements defined in the pipeline definition file.
- Parsing of all Python or SQL files listed in the pipeline definition file.
- Generation of the
dataflow graph, allowing the engine to plan the execution order of the various pipeline elements based on declared dependencies. - Execution of the various flows in the defined order (parallelization and sequencing based on dependencies).
- Execution end.
Documentation :
Warning : During testing, the framework struggled to define the dependency graph when the code was written entirely using Python decorators. Many errors were related to the framework searching for the existence of a table in the data catalog rather than within the pipeline scripts. The use of SQL code seems much better handled for the time being.
Advantages
- Automatic dependency resolution and pipeline execution optimization : This is the most tangible benefit. The order in which Python functions or SQL queries are defined does not matter. The SDP famework analyzes the graph and determines the optimal execution order. The SDP framework runs queries without direct dependencies in parallel, then sequentially executes those with dependencies.
- Enables pre-execution validation with dry-run : The
dry-runfeature detects syntax errors (Python or SQL), analysis errors (missing tables or columns), and graph validation errors like cyclic dependencies without reading or writing any data. This allows for automated checks within CI/CD pipelines. - Format-agnostic : The SDP framework supports all data sources and catalogs supported by Spark. It is recommended to use Iceberg or Delta Lake data formats.
- Easier maintenance : Understanding and maintaining a pipeline based on a declarative framework is much easier than managing a set of imperative PySpark processes.
Limitations
- Ecosystem immaturity (features and tooling) : The SDP framework is in a very active development cycle. The behavior of certain features may change between Spark versions. Not everything is documented yet. It is far too early for production adoption. Popular orchestrators (Apache Airflow, Prefect, Dagster) do not necessarily support this framework (or do so in a very limited way).
- Premium features remain exclusive to Databricks : Features not available in the open-source SDP framework include, for example:
Auto CDC(Change Data Capture)Expectationswith automaticenforcement(@dp.expect, automated quarantine of invalid rows)- Visual data lineage and integrated advanced observability
- Queryable event logs for pipeline auditing
- Less intuitive debugging compared to imperative code : Stack traces reference internal SDP framework classes rather than the specific lines of Python or SQL code. This is particularly critical for teams used to debugging Spark jobs using commands like
spark.readordf.show(). - Function and optimization limitations : There are far fewer options for handling complex pipelines and processing; this may require reverting to imperative code (e.g., state management for streaming, need for unsupported functions like
pivot, …).
Codes
Approach
To test the SDP framework, we will follow these steps :
- Local Spark v4.1.1 cluster creation
- Creation of a limited dataset
- Creation of a pipeline within an SDP framework project
- Execution of the created pipeline
The folders used for this project are :
application: Folder for storing all scriptsdata: Folder for storing datasets (input and output)logs: Folder for storing Spark cluster execution logsspark-image-docker: Folder for storing the files required to create the Docker image
Cluster Spark
To test this SDP framework with Spark version 4.1.1, we will create a Docker image and the necessary components to set up a local cluster consisting of one Master node (driver) and two Worker nodes.
Creating a Docker Image
Steps to perform from the spark-image-docker directory :
- Create the
spark-defaults.conffile to define the Spark cluster configuration elements. - Create the
spark-env.shfile to define the environment variables for the Spark cluster. - Create the
spark-start.shfile to define the execution script for the Spark cluster services. - Create the
Dockerfileto define the Spark image content for cluster creation. - Execute the Docker image creation command :
docker build -t spark4 spark-image-docker --no-cache
Content of the spark-defaults.conf file :
1# --- Force JARs into Classpath ---
2spark.driver.extraClassPath /opt/spark/jars/*
3spark.executor.extraClassPath /opt/spark/jars/*
4
5# --- History Server Configuration ---
6spark.eventLog.enabled true
7spark.eventLog.dir file:///opt/spark/event_logs
8spark.history.fs.logDirectory file:///opt/spark/event_logs
9
10# --- Delta Configuration ---
11spark.connect.extensions.relation.classes org.apache.spark.sql.connect.delta.DeltaRelationPlugin
12spark.connect.extensions.command.classes org.apache.spark.sql.connect.delta.DeltaCommandPlugin
13spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
14
15# --- Database Configurations ---
16spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
17
18
19# --- Default Catalog configuration ---
20spark.sql.defaultCatalog spark_catalog
21
22# --- Default warehouse local path configuration ---
23spark.sql.warehouse.dir file:///opt/spark/data/warehouse
Content of the spark-env.sh file :
1#!/bin/env bash
2
3export SPARK_LOCAL_IP=`hostname -i`
4export SPARK_PUBLIC_DNS=`hostname -f`
Content of the spark-start.sh file :
1#!/bin/bash
2
3# Start the SSH daemon
4/usr/sbin/sshd
5if [ $? -ne 0 ]; then
6 echo "Failed to start SSH server. Exiting."
7 exit 1
8fi
9
10if [ "$SPARK_MODE" = "master" ]; then
11 echo "Starting Spark Master..."
12 # Spark Master/Driver
13 $SPARK_HOME/sbin/start-master.sh
14 # Spark Connect
15 $SPARK_HOME/sbin/start-connect-server.sh
16 # History server
17 $SPARK_HOME/sbin/start-history-server.sh
18elif [ "$SPARK_MODE" = "worker" ]; then
19 echo "Starting Spark Worker..."
20 # Spark Worker
21 $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER_URL
22else
23 echo "Unknown SPARK_MODE: $SPARK_MODE"
24 exit 1
25fi
26
27# Keep the container alive
28tail -f $SPARK_HOME/logs/*
Content of the Dockerfile file :
1# Use OpenJDK base image
2FROM eclipse-temurin:21-jdk-jammy
3
4
5# Define env variables
6ENV SPARK_MASTER="spark://spark-master:7077"
7ENV SPARK_MASTER_HOST=spark-master
8ENV SPARK_MASTER_PORT=7077
9ENV PYSPARK_PYTHON=python3
10ENV SPARK_HOME=/opt/spark
11ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
12ENV DELTA_VERSION="4.1.0"
13ENV SPARK_VERSION="4.1.1"
14ENV SCALA_VERSION="2.13"
15
16
17# Install tools
18RUN apt-get update \
19 && apt-get install -y --no-install-recommends wget tar iputils-ping rsync openssh-server openssh-client \
20 && apt-get install -y --no-install-recommends python3 python3-pip \
21 && rm -rf /var/lib/apt/lists/*
22
23# Manage SSH informations
24RUN mkdir -p /root/.ssh/ \
25 && ssh-keygen -t rsa -f /root/.ssh/id_rsa -q -N "" \
26 && cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys \
27 && chmod 600 /root/.ssh/authorized_keys \
28 && echo "Host *" >> /root/.ssh/config \
29 && echo " StrictHostKeyChecking no" >> /root/.ssh/config \
30 && chmod 600 /root/.ssh/config \
31 && mkdir -p /var/run/sshd \
32 && ssh-keygen -A
33
34# Install Spark
35RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz
36RUN tar -xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz \
37 && rm spark-${SPARK_VERSION}-bin-hadoop3.tgz \
38 && mv spark-${SPARK_VERSION}-bin-hadoop3 ${SPARK_HOME} \
39 && chown -R root:root ${SPARK_HOME} \
40 && mkdir -p ${SPARK_HOME}/logs \
41 && mkdir -p ${SPARK_HOME}/event_logs
42
43# Download the Delta Spark JAR directly into Spark's main jars directory
44# Spark automatically loads all JARs from this folder on startup.
45RUN wget https://repo1.maven.org/maven2/io/delta/delta-spark_${SCALA_VERSION}/${DELTA_VERSION}/delta-spark_${SCALA_VERSION}-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
46RUN wget https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_VERSION}/delta-storage-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
47RUN wget https://repo1.maven.org/maven2/io/delta/delta-connect-client_${SCALA_VERSION}/${DELTA_VERSION}/delta-connect-client_${SCALA_VERSION}-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
48
49# Set up Spark configuration for logging and history server
50COPY spark-defaults.conf $SPARK_HOME/conf/spark-defaults.conf
51
52# Set up Spark configuration scripts
53COPY spark-env.sh $SPARK_HOME/conf/spark-env.sh
54COPY spark-start.sh $SPARK_HOME/spark-start.sh
55RUN chmod +x $SPARK_HOME/conf/spark-env.sh
56RUN chmod +x $SPARK_HOME/spark-start.sh
57
58# Expose needed ports
59EXPOSE 7077 8080 4040 15002 22
60
61# Entrypoint config
62CMD ["/opt/spark/spark-start.sh"]
Creating the Docker Compose file
Steps to perform from the root directory :
- Create the
compose.ymlfile to define the various cluster components, which will consist of oneMasternode and twoWorkernodes. - Start the local cluster using the command
docker-compose up -d - Stop the local cluster using the command
docker-compose down
Content of the compose.yml file :
1services:
2
3 spark-master:
4 image: spark4
5 container_name: spark-master
6 hostname: spark-master
7 environment:
8 - SPARK_MODE=master
9 - SPARK_RPC_AUTHENTICATION_ENABLED=false
10 - SPARK_RPC_ENCRYPTION_ENABLED=false
11 - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=false
12 - SPARK_SSL_ENABLED=false
13 - SPARK_PUBLIC_DNS=spark-master
14 - SPARK_MASTER_HOST=spark-master
15 - SPARK_MASTER_PORT=7077
16 - SPARK_DRIVER_MEMORY=1g
17 - SPARK_DRIVER_CORES=1
18 - SPARK_EXECUTOR_MEMORY=1g
19 - SPARK_MASTER_WEBUI_PORT=8080
20 ports:
21 - "4040:4040" # Application UI (Job Details)
22 - "8080:8080" # Interface web du master
23 - "7077:7077" # Port de communication Spark
24 - "15002:15002" # Port Spark Connect
25 - "18080:18080" # Interface History Server
26 deploy:
27 resources:
28 limits:
29 cpus: '1'
30 memory: 1G
31 volumes:
32 - ./data:/opt/spark/data
33 - ./logs/events:/opt/spark/event_logs
34 - ./application/src:/home/root/src
35 networks:
36 - spark-network
37
38
39 spark-worker-1:
40 image: spark4
41 container_name: spark-worker-1
42 hostname: spark-worker-1
43 depends_on:
44 - spark-master
45 environment:
46 - SPARK_MODE=worker
47 - SPARK_MASTER_URL=spark://spark-master:7077
48 - SPARK_RPC_AUTHENTICATE=false
49 - SPARK_RPC_ENCRYPTION=false
50 - SPARK_LOCAL_STORAGE_ENCRYPTION=false
51 - SPARK_SSL_ENABLED=no
52 - SPARK_PUBLIC_DNS=spark-worker-1
53 - SPARK_MASTER_HOST=spark-master
54 - SPARK_MASTER_PORT=7077
55 - SPARK_WORKER_CORES=2
56 - SPARK_WORKER_MEMORY=2g
57 - SPARK_EXECUTOR_MEMORY=1g
58 - SPARK_WORKER_WEBUI_PORT=8081
59 ports:
60 - "8081:8081" # Interface web (worker)
61 volumes:
62 - ./data:/opt/spark/data
63 networks:
64 - spark-network
65 deploy:
66 resources:
67 limits:
68 cpus: '2'
69 memory: 2G
70
71
72 spark-worker-2:
73 image: spark4
74 container_name: spark-worker-2
75 hostname: spark-worker-2
76 depends_on:
77 - spark-master
78 environment:
79 - SPARK_MODE=worker
80 - SPARK_MASTER_URL=spark://spark-master:7077
81 - SPARK_RPC_AUTHENTICATE=false
82 - SPARK_RPC_ENCRYPTION=false
83 - SPARK_LOCAL_STORAGE_ENCRYPTION=false
84 - SPARK_SSL_ENABLED=no
85 - SPARK_PUBLIC_DNS=spark-worker-2
86 - SPARK_MASTER_HOST=spark-master
87 - SPARK_MASTER_PORT=7077
88 - SPARK_WORKER_CORES=2
89 - SPARK_WORKER_MEMORY=2g
90 - SPARK_EXECUTOR_MEMORY=1g
91 - SPARK_WORKER_WEBUI_PORT=8081
92 ports:
93 - "8082:8081" # Interface web (worker)
94 volumes:
95 - ./data:/opt/spark/data
96 networks:
97 - spark-network
98 deploy:
99 resources:
100 limits:
101 cpus: '2'
102 memory: 2G
103
104
105networks:
106 spark-network:
107 driver: bridge
Content of the log file after running the command docker-compose up -d :
1[+] up 4/4
2 ✔ Network spark4_spark-network Created
3 ✔ Container spark-master Created
4 ✔ Container spark-worker-1 Created
5 ✔ Container spark-worker-2 Created
Content of the log file after running the command docker-compose down :
1[+] down 4/4
2 ✔ Container spark-worker-2 Removed
3 ✔ Container spark-worker-1 Removed
4 ✔ Container spark-master Removed
5 ✔ Network spark4_spark-network Removed
List of ports and interfaces
Based on the configuration in the compose.yml file:
4040: Communication port for the application UI (Jobs UI)7077: Communication port for the Spark cluster (Internal)8080: Communication port for the Master node web interface (driver)8081: Communication port for Worker #1 node web interface8082: Communication port for Worker #2 node web interface15002: Communication port for the Spark Connect server18080: Communication port for the History Server interface
Dataset
In the data directory:
- Create an
inputdirectory:mkdir -p data/input - Create the
clients.csvfile to store a few rows defining customers - Create the
products.csvfile to store a few rows defining products - Create the
orders.csvfile to store a few rows defining product purchase events by customers
Content of the clients.csv file :
1id_client,lib_client,lib_city
21,Alice,Paris
32,Bob,Lyon
43,Charlie,Marseille
54,David,Lille
65,Eve,Bordeaux
Content of the products.csv file :
1id_product,lib_product,mnt_prix_unit,lib_category
2101,Laptop,1200.0,Electronique
3102,Souris,25.0,Accessoires
4103,Clavier,45.0,Accessoires
5104,Ecran,200.0,Electronique
6105,Casque,80.0,Audio
Content of the orders.csv file :
1id_commande,id_client,id_product,nb_quantity,dt_commande
21,1,101,1,2025-10-01
32,2,102,2,2025-10-01
43,3,103,1,2025-10-02
54,1,104,1,2025-10-02
65,5,105,2,2025-10-03
76,2,101,1,2025-10-03
87,4,103,3,2025-10-04
98,1,102,1,2025-10-04
109,3,101,1,2025-10-05
1110,5,104,1,2025-10-05
1211,2,105,1,2025-10-06
1312,4,101,1,2025-10-06
1413,1,105,1,2025-10-07
1514,3,102,4,2025-10-07
1615,5,103,1,2025-10-08
Creating a pipeline in an SDP project
The steps are as follows:
- Install the SDP framework locally
- Initialize an SDP template project using the
spark-pipelines initcommand - Create the necessary components for the pipeline
Installing the SDP framework locally
To install the components needed to test the SDP framework, use the following pip command: pip install pyspark[pipelines].
Due to some dependency compatibility issues in my work environment, I used the following command to install the necessary components with the corresponding compatible versions: pip install pyspark==4.1.1 pyspark[pipelines] protobuf==6.33.0
Initializing an SDP project
To initialize an SDP project, the easiest way is to navigate to the application directory and run the following command: spark-pipelines init --name LoadOrdersData.
This will create a directory named LoadOrdersData within the application directory, along with the following directory structure :
pipeline-storage/: Directory for storing checkpoints when using Spark Streamingtransformations/: Directory for storing all Python and SQL scripts defining the pipelinesspark-pipeline.yml: Pipeline configuration file
Creating the necessary components for the pipeline
The goal is to create the following pipeline :
graph TD;
A(clients.csv) --load_bronze_orders.py--> D(sdp_bronze.raw_clients)
B(products.csv) --load_bronze_orders.py--> E(sdp_bronze.raw_products)
C(orders.csv) --load_bronze_orders.py--> F(sdp_bronze.raw_orders)
D(sdp_bronze.raw_clients) --load_silver_orders.sql--> G(sdp_silver.refined_clients)
E(sdp_bronze.raw_products) --load_silver_orders.sql--> H(sdp_silver.refined_products)
F(sdp_bronze.raw_orders) --load_silver_orders.sql--> I(sdp_silver.refined_orders)
H(sdp_silver.refined_products) --load_gold_orders.sql--> J(sdp_gold.dist_by_product)
I(sdp_silver.refined_orders) --load_gold_orders.sql--> J(sdp_gold.dist_by_product)
G(sdp_silver.refined_clients) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
H(sdp_silver.refined_products) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
I(sdp_silver.refined_orders) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
The steps are as follows:
- Create the Python script
load_bronze_orders.pyto define the tables for storing data from the CSV files - Create the SQL script
load_silver_orders.sqlto refine the tables from the bronze zone into the silver zone - Create the SQL script
load_gold_orders.sqlto execute aggregation queries based on the tables from the silver zone
Content of the load_bronze_orders.py file :
1from pyspark import pipelines as dp
2from pyspark.sql import DataFrame, SparkSession
3from pyspark.sql.functions import current_timestamp
4
5
6PATH_DATA_INPUT="file:///opt/spark/data/input"
7spark = SparkSession.active()
8
9
10# --- Bronze zone (raw loading) ---
11@dp.materialized_view(name="sdp_bronze.raw_orders")
12def raw_orders() -> DataFrame:
13 return spark.read \
14 .format("csv") \
15 .option("header",True) \
16 .option("inferSchema",True) \
17 .load(f"{PATH_DATA_INPUT}/orders.csv") \
18 .withColumn("ts_load_file", current_timestamp())
19
20@dp.materialized_view(name="sdp_bronze.raw_clients")
21def raw_clients() -> DataFrame:
22 return spark.read \
23 .format("csv") \
24 .option("header",True) \
25 .option("inferSchema",True) \
26 .load(f"{PATH_DATA_INPUT}/clients.csv") \
27 .withColumn("ts_load_file", current_timestamp())
28
29@dp.materialized_view(name="sdp_bronze.raw_products")
30def raw_products() -> DataFrame:
31 return spark.read \
32 .format("csv") \
33 .option("header",True) \
34 .option("inferSchema",True) \
35 .load(f"{PATH_DATA_INPUT}/products.csv") \
36 .withColumn("ts_load_file", current_timestamp())
Content of the load_silver_orders.sql file :
1--- Silver Zone (cleaned & typed) ---
2CREATE MATERIALIZED VIEW sdp_silver.refined_orders
3AS
4SELECT int(id_commande)
5 ,int(id_client)
6 ,int(id_product)
7 ,int(nb_quantity)
8 ,to_date(dt_commande) as dt_commande
9 ,to_timestamp(ts_load_file) as ts_load_file
10FROM sdp_bronze.raw_orders
11WHERE id_commande IS NOT NULL
12;
13
14
15CREATE MATERIALIZED VIEW sdp_silver.refined_clients
16AS
17SELECT int(id_client)
18 ,string(lib_client)
19 ,string(lib_city)
20 ,to_timestamp(ts_load_file) as ts_load_file
21FROM sdp_bronze.raw_clients
22WHERE id_client IS NOT NULL
23;
24
25
26CREATE MATERIALIZED VIEW sdp_silver.refined_products
27AS
28SELECT int(id_product)
29 ,string(lib_product)
30 ,double(mnt_prix_unit)
31 ,string(lib_category)
32 ,to_timestamp(ts_load_file) as ts_load_file
33FROM sdp_bronze.raw_products
34WHERE id_product IS NOT NULL
35;
Content of the load_gold_orders.sql file :
1--- Gold Zone (Business analyses) ---
2CREATE MATERIALIZED VIEW sdp_gold.dist_by_client
3AS
4SELECT c.id_client, c.lib_client, ROUND(SUM(o.nb_quantity * p.mnt_prix_unit), 2) as total_expense
5FROM sdp_silver.refined_orders o
6INNER JOIN sdp_silver.refined_clients c
7ON (o.id_client = c.id_client)
8INNER JOIN sdp_silver.refined_products p
9ON (o.id_product = p.id_product)
10GROUP BY c.id_client, c.lib_client
11ORDER BY total_expense DESC
12;
13
14CREATE MATERIALIZED VIEW sdp_gold.dist_by_product
15AS
16SELECT p.id_product, p.lib_product, SUM(o.nb_quantity) as sales_volume
17FROM sdp_silver.refined_orders o
18INNER JOIN sdp_silver.refined_products p
19ON (o.id_product = p.id_product)
20GROUP BY p.id_product, p.lib_product
21ORDER BY sales_volume DESC
22;
Creating elements to manage the environment
To properly manage the environment, we will create two PySpark scripts in the application/LoadOrdersData directory:
- The
init_schemas.pyscript will create the schemas needed to create the pipeline tablessdp_bronze: For storing bronze tablessdp_silver: For storing silver tablessdp_gold: For storing gold tables
- The
check_list_tables.pyscript will displays all the created elements
Content of the init_schemas.py file :
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import col
3
4REMOTE_URL = "sc://localhost:15002"
5
6spark = SparkSession.builder.remote(REMOTE_URL).getOrCreate()
7
8spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_bronze COMMENT 'Schema for SDP Pipelines - Bronze Zone'")
9spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_silver COMMENT 'Schema for SDP Pipelines - Silver Zone'")
10spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_gold COMMENT 'Schema for SDP Pipelines - Gold Zone'")
11
12list_schemas = spark.sql("show schemas;").where(col("namespace").like('sdp_%'))
13
14nb_sdp_schema = list_schemas.count()
15
16if (nb_sdp_schema == 3):
17 print(f"[INFO] The SDP Schemas have been successfully created !")
18else :
19 print(f"[WARN] The SDP Schemas were not created successfully !")
20
21list_schemas.show()
22
23spark.stop()
Content of the check_list_tables.py file :
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import col
3
4REMOTE_URL = "sc://localhost:15002"
5
6spark = SparkSession.builder.remote(REMOTE_URL).getOrCreate()
7
8list_tables = ['clients','products','orders']
9list_tables_gold = ['dist_by_client','dist_by_product']
10
11print(f"[INFO] Data from SDP_BRONZE :")
12for table in list_tables :
13 tmp_table = f"sdp_bronze.raw_{table}"
14 print(f"- {tmp_table} :")
15 spark.table(tmp_table).show(truncate=False)
16
17print(f"[INFO] Data from SDP_SILVER :")
18for table in list_tables :
19 tmp_table = f"sdp_silver.refined_{table}"
20 print(f"- {tmp_table} :")
21 spark.table(tmp_table).show(truncate=False)
22
23print(f"[INFO] Data from SDP_GOLD :")
24for table in list_tables_gold :
25 tmp_table = f"sdp_gold.{table}"
26 print(f"- {tmp_table} :")
27 spark.table(tmp_table).show(truncate=False)
28
29spark.stop()
Result of running the created pipeline
Pipeline Execution
All commands will be executed from the application/LoadOrdersData/ directory
The execution order is as follows :
- Run the schema initialization script using the command
python init_schemas.py - Verify that the message displayed by the
init_schemas.pyscript is[INFO] The SDP Schemas have been successfully created ! - Run the pipeline using the command
spark-pipelines run --spec spark-pipeline.yml - Run the script to verify the created objects using the command
python check_list_tables.py
Execution result
Output of running the command python init_schemas.py :
1[INFO] The SDP Schemas have been successfully created !
2+----------+
3| namespace|
4+----------+
5|sdp_bronze|
6| sdp_gold|
7|sdp_silver|
8+----------+
Output of running the command spark-pipelines run --spec spark-pipeline.yml :
12026-03-30 19:31:25: Loading pipeline spec from spark-pipeline.yml...
22026-03-30 19:31:25: Creating Spark session...
32026-03-30 19:31:25: Creating dataflow graph...
42026-03-30 19:31:25: Registering graph elements...
52026-03-30 19:31:25: Loading definitions. Root directory: '.../application/LoadOrdersData'.
62026-03-30 19:31:25: Found 3 files matching glob 'transformations/**/*'
72026-03-30 19:31:25: Importing .../application/LoadOrdersData/transformations/load_bronze_orders.py...
82026-03-30 19:31:27: Registering SQL file .../application/LoadOrdersData/transformations/load_gold_orders.sql...
92026-03-30 19:31:28: Registering SQL file .../application/LoadOrdersData/transformations/load_silver_orders.sql...
102026-03-30 19:31:28: Starting run...
112026-03-30 19:31:38: Flow spark_catalog.sdp_gold.dist_by_product is QUEUED.
122026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_orders is QUEUED.
132026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is QUEUED.
142026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_products is QUEUED.
152026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is QUEUED.
162026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is QUEUED.
172026-03-30 19:31:38: Flow spark_catalog.sdp_gold.dist_by_client is QUEUED.
182026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_clients is QUEUED.
192026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is PLANNING.
202026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is STARTING.
212026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is RUNNING.
222026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is PLANNING.
232026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is STARTING.
242026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is RUNNING.
252026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is PLANNING.
262026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is STARTING.
272026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is RUNNING.
282026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_orders has COMPLETED.
292026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_products has COMPLETED.
302026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_clients has COMPLETED.
312026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is PLANNING.
322026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is STARTING.
332026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is RUNNING.
342026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is PLANNING.
352026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is STARTING.
362026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is RUNNING.
372026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is PLANNING.
382026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is STARTING.
392026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is RUNNING.
402026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_clients has COMPLETED.
412026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_products has COMPLETED.
422026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_orders has COMPLETED.
432026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is PLANNING.
442026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is STARTING.
452026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is RUNNING.
462026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is PLANNING.
472026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is STARTING.
482026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is RUNNING.
492026-03-30 19:32:16: Flow spark_catalog.sdp_gold.dist_by_product has COMPLETED.
502026-03-30 19:32:16: Flow spark_catalog.sdp_gold.dist_by_client has COMPLETED.
512026-03-30 19:32:17: Run is COMPLETED.
Output of running the command python check_list_tables.py :
1[INFO] Data from SDP_BRONZE :
2- sdp_bronze.raw_clients :
3+---------+----------+---------+--------------------------+
4|id_client|lib_client|lib_city |ts_load_file |
5+---------+----------+---------+--------------------------+
6|1 |Alice |Paris |2026-03-30 19:31:38.862824|
7|2 |Bob |Lyon |2026-03-30 19:31:38.862824|
8|3 |Charlie |Marseille|2026-03-30 19:31:38.862824|
9|4 |David |Lille |2026-03-30 19:31:38.862824|
10|5 |Eve |Bordeaux |2026-03-30 19:31:38.862824|
11+---------+----------+---------+--------------------------+
12
13- sdp_bronze.raw_products :
14+----------+-----------+-------------+------------+--------------------------+
15|id_product|lib_product|mnt_prix_unit|lib_category|ts_load_file |
16+----------+-----------+-------------+------------+--------------------------+
17|101 |Laptop |1200.0 |Electronique|2026-03-30 19:31:38.859625|
18|102 |Souris |25.0 |Accessoires |2026-03-30 19:31:38.859625|
19|103 |Clavier |45.0 |Accessoires |2026-03-30 19:31:38.859625|
20|104 |Ecran |200.0 |Electronique|2026-03-30 19:31:38.859625|
21|105 |Casque |80.0 |Audio |2026-03-30 19:31:38.859625|
22+----------+-----------+-------------+------------+--------------------------+
23
24- sdp_bronze.raw_orders :
25+-----------+---------+----------+-----------+-----------+--------------------------+
26|id_commande|id_client|id_product|nb_quantity|dt_commande|ts_load_file |
27+-----------+---------+----------+-----------+-----------+--------------------------+
28|1 |1 |101 |1 |2025-10-01 |2026-03-30 19:31:38.762104|
29|2 |2 |102 |2 |2025-10-01 |2026-03-30 19:31:38.762104|
30|3 |3 |103 |1 |2025-10-02 |2026-03-30 19:31:38.762104|
31|4 |1 |104 |1 |2025-10-02 |2026-03-30 19:31:38.762104|
32|5 |5 |105 |2 |2025-10-03 |2026-03-30 19:31:38.762104|
33|6 |2 |101 |1 |2025-10-03 |2026-03-30 19:31:38.762104|
34|7 |4 |103 |3 |2025-10-04 |2026-03-30 19:31:38.762104|
35|8 |1 |102 |1 |2025-10-04 |2026-03-30 19:31:38.762104|
36|9 |3 |101 |1 |2025-10-05 |2026-03-30 19:31:38.762104|
37|10 |5 |104 |1 |2025-10-05 |2026-03-30 19:31:38.762104|
38|11 |2 |105 |1 |2025-10-06 |2026-03-30 19:31:38.762104|
39|12 |4 |101 |1 |2025-10-06 |2026-03-30 19:31:38.762104|
40|13 |1 |105 |1 |2025-10-07 |2026-03-30 19:31:38.762104|
41|14 |3 |102 |4 |2025-10-07 |2026-03-30 19:31:38.762104|
42|15 |5 |103 |1 |2025-10-08 |2026-03-30 19:31:38.762104|
43+-----------+---------+----------+-----------+-----------+--------------------------+
44
45[INFO] Data from SDP_SILVER :
46- sdp_silver.refined_clients :
47+---------+----------+---------+--------------------------+
48|id_client|lib_client|lib_city |ts_load_file |
49+---------+----------+---------+--------------------------+
50|1 |Alice |Paris |2026-03-30 19:31:38.862824|
51|2 |Bob |Lyon |2026-03-30 19:31:38.862824|
52|3 |Charlie |Marseille|2026-03-30 19:31:38.862824|
53|4 |David |Lille |2026-03-30 19:31:38.862824|
54|5 |Eve |Bordeaux |2026-03-30 19:31:38.862824|
55+---------+----------+---------+--------------------------+
56
57- sdp_silver.refined_products :
58+----------+-----------+-------------+------------+--------------------------+
59|id_product|lib_product|mnt_prix_unit|lib_category|ts_load_file |
60+----------+-----------+-------------+------------+--------------------------+
61|101 |Laptop |1200.0 |Electronique|2026-03-30 19:31:38.859625|
62|102 |Souris |25.0 |Accessoires |2026-03-30 19:31:38.859625|
63|103 |Clavier |45.0 |Accessoires |2026-03-30 19:31:38.859625|
64|104 |Ecran |200.0 |Electronique|2026-03-30 19:31:38.859625|
65|105 |Casque |80.0 |Audio |2026-03-30 19:31:38.859625|
66+----------+-----------+-------------+------------+--------------------------+
67
68- sdp_silver.refined_orders :
69+-----------+---------+----------+-----------+-----------+--------------------------+
70|id_commande|id_client|id_product|nb_quantity|dt_commande|ts_load_file |
71+-----------+---------+----------+-----------+-----------+--------------------------+
72|1 |1 |101 |1 |2025-10-01 |2026-03-30 19:31:38.762104|
73|2 |2 |102 |2 |2025-10-01 |2026-03-30 19:31:38.762104|
74|3 |3 |103 |1 |2025-10-02 |2026-03-30 19:31:38.762104|
75|4 |1 |104 |1 |2025-10-02 |2026-03-30 19:31:38.762104|
76|5 |5 |105 |2 |2025-10-03 |2026-03-30 19:31:38.762104|
77|6 |2 |101 |1 |2025-10-03 |2026-03-30 19:31:38.762104|
78|7 |4 |103 |3 |2025-10-04 |2026-03-30 19:31:38.762104|
79|8 |1 |102 |1 |2025-10-04 |2026-03-30 19:31:38.762104|
80|9 |3 |101 |1 |2025-10-05 |2026-03-30 19:31:38.762104|
81|10 |5 |104 |1 |2025-10-05 |2026-03-30 19:31:38.762104|
82|11 |2 |105 |1 |2025-10-06 |2026-03-30 19:31:38.762104|
83|12 |4 |101 |1 |2025-10-06 |2026-03-30 19:31:38.762104|
84|13 |1 |105 |1 |2025-10-07 |2026-03-30 19:31:38.762104|
85|14 |3 |102 |4 |2025-10-07 |2026-03-30 19:31:38.762104|
86|15 |5 |103 |1 |2025-10-08 |2026-03-30 19:31:38.762104|
87+-----------+---------+----------+-----------+-----------+--------------------------+
88
89[INFO] Data from SDP_GOLD :
90- sdp_gold.dist_by_client :
91+---------+----------+-------------+
92|id_client|lib_client|total_expense|
93+---------+----------+-------------+
94|1 |Alice |1505.0 |
95|3 |Charlie |1345.0 |
96|4 |David |1335.0 |
97|2 |Bob |1330.0 |
98|5 |Eve |405.0 |
99+---------+----------+-------------+
100
101- sdp_gold.dist_by_product :
102+----------+-----------+------------+
103|id_product|lib_product|sales_volume|
104+----------+-----------+------------+
105|102 |Souris |7 |
106|103 |Clavier |5 |
107|105 |Casque |4 |
108|101 |Laptop |4 |
109|104 |Ecran |2 |
110+----------+-----------+------------+

