Spark : v4.x - Features - Spark Declarative Pipelines

In this article, you will find information about the Spark Declarative Pipelines (SDP) feature in Spark v4.1.

Introduction

In June 2022, Databricks announced the general availability of a new feature named Delta Live Tables (DLT) within its ecosystem, enabling the declarative development of Spark processing pipelines. This feature was primarily based on the Delta Lake data format and the Unity Catalog data catalog.

In June 2025, during the Data+AI Summit 2025 event, Databricks publicly announced the open-sourcing of a core part of its declarative data processing framework based on Apache Spark. Consequently, it excluded a large number of features and optimizations that will remain exclusively available on the Databricks platform.

In July 2025, Databricks announced the renaming of the Delta Live Tables feature to Lakeflow Declarative Pipelines, which became the official name of this feature within the Databricks platform.

Information

Spark Declarative Pipelines (SDP) is a new component of Apache Spark 4.1 designed to allow developers to focus on data transformations rather than the explicit management of dependencies and pipeline execution. In practice, SDP framework allows you to describe what the data should look like, and Spark handles the “how”: task scheduling, parallelism, checkpoints, and retries.

The API relies on Python decorators and SQL, with a dedicated client (spark-pipelines) to execute the pipelines.

SDP framework manages execution order, dependency resolution, and incremental processing, which simplifies pipeline development, maintenance, testing, and monitoring.

The framework is similar to tools like DBT or SQLMesh, but it is still immature and has a very limited ecosystem (features, documentation, optimizations, possibilities, compatibilities, etc.).

The official programming guide is extremely clear and concise for understanding the key concepts.

Key elements :

  • The pipeline definition YAML file defines the structural elements for parsing and executing flows.
  • You can use Python scripts (with decorators) and SQL scripts.
  • Three types of usable datasets :
    • Streaming Table : Used to create a table to store data processed with Spark Streaming.
      • Python syntax : @dp.table or dp.create_streaming_table
      • SQL syntax : CREATE STREAMING TABLE
    • Materialized View : Used to create a table to store batch data.
      • Python syntax : @dp.materialized_view
      • SQL syntax : CREATE MATERIALIZED VIEW
    • Temporary View : Used to create a temporary view for intermediate results during pipeline execution.
      • Python syntax : @dp.temporary_view
      • SQL syntax : CREATE TEMPORARY VIEW

Specific SDP framework commands :

  • spark-pipelines init : To initialize a new project.
  • spark-pipelines dry-run --spec pipeline_def.yaml : To validate a pipeline without executing it (very useful for validating developments with CI/CD pipelines).
  • spark-pipelines run --spec pipeline_def.yaml : To execute the desired pipeline.

General operation of the SDP framework :

  1. Execution start.
  2. Reading of the pipeline definition file.
  3. Creation of a Spark session based on the configuration elements defined in the pipeline definition file.
  4. Parsing of all Python or SQL files listed in the pipeline definition file.
  5. Generation of the dataflow graph, allowing the engine to plan the execution order of the various pipeline elements based on declared dependencies.
  6. Execution of the various flows in the defined order (parallelization and sequencing based on dependencies).
  7. Execution end.

Documentation :

Warning : During testing, the framework struggled to define the dependency graph when the code was written entirely using Python decorators. Many errors were related to the framework searching for the existence of a table in the data catalog rather than within the pipeline scripts. The use of SQL code seems much better handled for the time being.

Advantages

  1. Automatic dependency resolution and pipeline execution optimization : This is the most tangible benefit. The order in which Python functions or SQL queries are defined does not matter. The SDP famework analyzes the graph and determines the optimal execution order. The SDP framework runs queries without direct dependencies in parallel, then sequentially executes those with dependencies.
  2. Enables pre-execution validation with dry-run : The dry-run feature detects syntax errors (Python or SQL), analysis errors (missing tables or columns), and graph validation errors like cyclic dependencies without reading or writing any data. This allows for automated checks within CI/CD pipelines.
  3. Format-agnostic : The SDP framework supports all data sources and catalogs supported by Spark. It is recommended to use Iceberg or Delta Lake data formats.
  4. Easier maintenance : Understanding and maintaining a pipeline based on a declarative framework is much easier than managing a set of imperative PySpark processes.

Limitations

  1. Ecosystem immaturity (features and tooling) : The SDP framework is in a very active development cycle. The behavior of certain features may change between Spark versions. Not everything is documented yet. It is far too early for production adoption. Popular orchestrators (Apache Airflow, Prefect, Dagster) do not necessarily support this framework (or do so in a very limited way).
  2. Premium features remain exclusive to Databricks : Features not available in the open-source SDP framework include, for example:
    1. Auto CDC (Change Data Capture)
    2. Expectations with automatic enforcement (@dp.expect, automated quarantine of invalid rows)
    3. Visual data lineage and integrated advanced observability
    4. Queryable event logs for pipeline auditing
  3. Less intuitive debugging compared to imperative code : Stack traces reference internal SDP framework classes rather than the specific lines of Python or SQL code. This is particularly critical for teams used to debugging Spark jobs using commands like spark.read or df.show().
  4. Function and optimization limitations : There are far fewer options for handling complex pipelines and processing; this may require reverting to imperative code (e.g., state management for streaming, need for unsupported functions like pivot, …).

Codes

Approach

To test the SDP framework, we will follow these steps :

  1. Local Spark v4.1.1 cluster creation
  2. Creation of a limited dataset
  3. Creation of a pipeline within an SDP framework project
  4. Execution of the created pipeline

The folders used for this project are :

  • application : Folder for storing all scripts
  • data : Folder for storing datasets (input and output)
  • logs : Folder for storing Spark cluster execution logs
  • spark-image-docker : Folder for storing the files required to create the Docker image

Cluster Spark

To test this SDP framework with Spark version 4.1.1, we will create a Docker image and the necessary components to set up a local cluster consisting of one Master node (driver) and two Worker nodes.

Creating a Docker Image

Steps to perform from the spark-image-docker directory :

  1. Create the spark-defaults.conf file to define the Spark cluster configuration elements.
  2. Create the spark-env.sh file to define the environment variables for the Spark cluster.
  3. Create the spark-start.sh file to define the execution script for the Spark cluster services.
  4. Create the Dockerfile to define the Spark image content for cluster creation.
  5. Execute the Docker image creation command : docker build -t spark4 spark-image-docker --no-cache

Content of the spark-defaults.conf file :

 1# --- Force JARs into Classpath ---
 2spark.driver.extraClassPath      /opt/spark/jars/*
 3spark.executor.extraClassPath    /opt/spark/jars/*
 4
 5# --- History Server Configuration ---
 6spark.eventLog.enabled              true
 7spark.eventLog.dir                  file:///opt/spark/event_logs
 8spark.history.fs.logDirectory       file:///opt/spark/event_logs
 9
10# --- Delta Configuration ---
11spark.connect.extensions.relation.classes   org.apache.spark.sql.connect.delta.DeltaRelationPlugin
12spark.connect.extensions.command.classes    org.apache.spark.sql.connect.delta.DeltaCommandPlugin
13spark.sql.extensions                        io.delta.sql.DeltaSparkSessionExtension
14
15# --- Database Configurations ---
16spark.sql.catalog.spark_catalog             org.apache.spark.sql.delta.catalog.DeltaCatalog
17
18
19# --- Default Catalog configuration ---
20spark.sql.defaultCatalog                spark_catalog
21
22# --- Default warehouse local path configuration ---
23spark.sql.warehouse.dir              file:///opt/spark/data/warehouse

Content of the spark-env.sh file :

1#!/bin/env bash
2
3export SPARK_LOCAL_IP=`hostname -i`
4export SPARK_PUBLIC_DNS=`hostname -f`

Content of the spark-start.sh file :

 1#!/bin/bash
 2
 3# Start the SSH daemon
 4/usr/sbin/sshd
 5if [ $? -ne 0 ]; then
 6    echo "Failed to start SSH server. Exiting."
 7    exit 1
 8fi
 9
10if [ "$SPARK_MODE" = "master" ]; then
11    echo "Starting Spark Master..."
12    # Spark Master/Driver
13    $SPARK_HOME/sbin/start-master.sh 
14    # Spark Connect
15    $SPARK_HOME/sbin/start-connect-server.sh 
16    # History server
17    $SPARK_HOME/sbin/start-history-server.sh 
18elif [ "$SPARK_MODE" = "worker" ]; then
19    echo "Starting Spark Worker..."
20    # Spark Worker
21    $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER_URL
22else
23    echo "Unknown SPARK_MODE: $SPARK_MODE"
24    exit 1
25fi
26
27# Keep the container alive
28tail -f $SPARK_HOME/logs/*

Content of the Dockerfile file :

 1# Use OpenJDK base image
 2FROM eclipse-temurin:21-jdk-jammy
 3
 4
 5# Define env variables
 6ENV SPARK_MASTER="spark://spark-master:7077"
 7ENV SPARK_MASTER_HOST=spark-master
 8ENV SPARK_MASTER_PORT=7077
 9ENV PYSPARK_PYTHON=python3
10ENV SPARK_HOME=/opt/spark
11ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
12ENV DELTA_VERSION="4.1.0"
13ENV SPARK_VERSION="4.1.1"
14ENV SCALA_VERSION="2.13"
15
16
17# Install tools
18RUN apt-get update \
19    && apt-get install -y --no-install-recommends wget tar iputils-ping rsync openssh-server openssh-client \
20    && apt-get install -y --no-install-recommends python3 python3-pip \
21    && rm -rf /var/lib/apt/lists/*
22
23# Manage SSH informations
24RUN mkdir -p /root/.ssh/ \
25    && ssh-keygen -t rsa -f /root/.ssh/id_rsa -q -N "" \
26    && cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys \
27    && chmod 600 /root/.ssh/authorized_keys \
28    && echo "Host *" >> /root/.ssh/config \
29    && echo "    StrictHostKeyChecking no" >> /root/.ssh/config \
30    && chmod 600 /root/.ssh/config \
31    && mkdir -p /var/run/sshd \
32    && ssh-keygen -A
33
34# Install Spark
35RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz
36RUN tar -xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz \
37    && rm spark-${SPARK_VERSION}-bin-hadoop3.tgz \
38    && mv spark-${SPARK_VERSION}-bin-hadoop3 ${SPARK_HOME} \ 
39    && chown -R root:root ${SPARK_HOME} \
40    && mkdir -p ${SPARK_HOME}/logs \
41    && mkdir -p ${SPARK_HOME}/event_logs
42
43# Download the Delta Spark JAR directly into Spark's main jars directory
44# Spark automatically loads all JARs from this folder on startup.
45RUN wget https://repo1.maven.org/maven2/io/delta/delta-spark_${SCALA_VERSION}/${DELTA_VERSION}/delta-spark_${SCALA_VERSION}-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
46RUN wget https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_VERSION}/delta-storage-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
47RUN wget https://repo1.maven.org/maven2/io/delta/delta-connect-client_${SCALA_VERSION}/${DELTA_VERSION}/delta-connect-client_${SCALA_VERSION}-${DELTA_VERSION}.jar -P ${SPARK_HOME}/jars/
48
49# Set up Spark configuration for logging and history server
50COPY spark-defaults.conf $SPARK_HOME/conf/spark-defaults.conf
51
52# Set up Spark configuration scripts
53COPY spark-env.sh $SPARK_HOME/conf/spark-env.sh
54COPY spark-start.sh $SPARK_HOME/spark-start.sh
55RUN chmod +x $SPARK_HOME/conf/spark-env.sh
56RUN chmod +x $SPARK_HOME/spark-start.sh
57
58# Expose needed ports
59EXPOSE 7077 8080 4040 15002 22
60
61# Entrypoint config
62CMD ["/opt/spark/spark-start.sh"]

Creating the Docker Compose file

Steps to perform from the root directory :

  1. Create the compose.yml file to define the various cluster components, which will consist of one Master node and two Worker nodes.
  2. Start the local cluster using the command docker-compose up -d
  3. Stop the local cluster using the command docker-compose down

Content of the compose.yml file :

  1services:
  2
  3  spark-master:
  4    image: spark4
  5    container_name: spark-master
  6    hostname: spark-master
  7    environment:
  8      - SPARK_MODE=master
  9      - SPARK_RPC_AUTHENTICATION_ENABLED=false
 10      - SPARK_RPC_ENCRYPTION_ENABLED=false
 11      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=false
 12      - SPARK_SSL_ENABLED=false
 13      - SPARK_PUBLIC_DNS=spark-master
 14      - SPARK_MASTER_HOST=spark-master
 15      - SPARK_MASTER_PORT=7077
 16      - SPARK_DRIVER_MEMORY=1g
 17      - SPARK_DRIVER_CORES=1
 18      - SPARK_EXECUTOR_MEMORY=1g
 19      - SPARK_MASTER_WEBUI_PORT=8080
 20    ports:
 21      - "4040:4040"   # Application UI (Job Details)
 22      - "8080:8080"   # Interface web du master
 23      - "7077:7077"   # Port de communication Spark
 24      - "15002:15002" # Port Spark Connect
 25      - "18080:18080" # Interface History Server
 26    deploy:
 27      resources:
 28        limits:
 29          cpus: '1'
 30          memory: 1G
 31    volumes:
 32      - ./data:/opt/spark/data
 33      - ./logs/events:/opt/spark/event_logs
 34      - ./application/src:/home/root/src
 35    networks:
 36      - spark-network
 37
 38
 39  spark-worker-1:
 40    image: spark4
 41    container_name: spark-worker-1
 42    hostname: spark-worker-1
 43    depends_on:
 44      - spark-master
 45    environment:
 46      - SPARK_MODE=worker
 47      - SPARK_MASTER_URL=spark://spark-master:7077
 48      - SPARK_RPC_AUTHENTICATE=false
 49      - SPARK_RPC_ENCRYPTION=false
 50      - SPARK_LOCAL_STORAGE_ENCRYPTION=false
 51      - SPARK_SSL_ENABLED=no
 52      - SPARK_PUBLIC_DNS=spark-worker-1
 53      - SPARK_MASTER_HOST=spark-master
 54      - SPARK_MASTER_PORT=7077
 55      - SPARK_WORKER_CORES=2
 56      - SPARK_WORKER_MEMORY=2g
 57      - SPARK_EXECUTOR_MEMORY=1g
 58      - SPARK_WORKER_WEBUI_PORT=8081
 59    ports:
 60      - "8081:8081" # Interface web (worker)
 61    volumes:
 62      - ./data:/opt/spark/data
 63    networks:
 64      - spark-network
 65    deploy:
 66      resources:
 67        limits:
 68          cpus: '2'
 69          memory: 2G
 70
 71
 72  spark-worker-2:
 73    image: spark4
 74    container_name: spark-worker-2
 75    hostname: spark-worker-2
 76    depends_on:
 77      - spark-master
 78    environment:
 79      - SPARK_MODE=worker
 80      - SPARK_MASTER_URL=spark://spark-master:7077
 81      - SPARK_RPC_AUTHENTICATE=false
 82      - SPARK_RPC_ENCRYPTION=false
 83      - SPARK_LOCAL_STORAGE_ENCRYPTION=false
 84      - SPARK_SSL_ENABLED=no
 85      - SPARK_PUBLIC_DNS=spark-worker-2
 86      - SPARK_MASTER_HOST=spark-master
 87      - SPARK_MASTER_PORT=7077
 88      - SPARK_WORKER_CORES=2
 89      - SPARK_WORKER_MEMORY=2g
 90      - SPARK_EXECUTOR_MEMORY=1g
 91      - SPARK_WORKER_WEBUI_PORT=8081
 92    ports:
 93      - "8082:8081" # Interface web (worker)
 94    volumes:
 95      - ./data:/opt/spark/data
 96    networks:
 97      - spark-network
 98    deploy:
 99      resources:
100        limits:
101          cpus: '2'
102          memory: 2G
103
104
105networks:
106  spark-network:
107    driver: bridge

Content of the log file after running the command docker-compose up -d :

1[+] up 4/4
2 ✔ Network spark4_spark-network Created
3 ✔ Container spark-master       Created
4 ✔ Container spark-worker-1     Created
5 ✔ Container spark-worker-2     Created

Content of the log file after running the command docker-compose down :

1[+] down 4/4
2 ✔ Container spark-worker-2     Removed
3 ✔ Container spark-worker-1     Removed
4 ✔ Container spark-master       Removed
5 ✔ Network spark4_spark-network Removed

List of ports and interfaces

Based on the configuration in the compose.yml file:

Dataset

In the data directory:

  1. Create an input directory: mkdir -p data/input
  2. Create the clients.csv file to store a few rows defining customers
  3. Create the products.csv file to store a few rows defining products
  4. Create the orders.csv file to store a few rows defining product purchase events by customers

Content of the clients.csv file :

1id_client,lib_client,lib_city
21,Alice,Paris
32,Bob,Lyon
43,Charlie,Marseille
54,David,Lille
65,Eve,Bordeaux

Content of the products.csv file :

1id_product,lib_product,mnt_prix_unit,lib_category
2101,Laptop,1200.0,Electronique
3102,Souris,25.0,Accessoires
4103,Clavier,45.0,Accessoires
5104,Ecran,200.0,Electronique
6105,Casque,80.0,Audio

Content of the orders.csv file :

 1id_commande,id_client,id_product,nb_quantity,dt_commande
 21,1,101,1,2025-10-01
 32,2,102,2,2025-10-01
 43,3,103,1,2025-10-02
 54,1,104,1,2025-10-02
 65,5,105,2,2025-10-03
 76,2,101,1,2025-10-03
 87,4,103,3,2025-10-04
 98,1,102,1,2025-10-04
109,3,101,1,2025-10-05
1110,5,104,1,2025-10-05
1211,2,105,1,2025-10-06
1312,4,101,1,2025-10-06
1413,1,105,1,2025-10-07
1514,3,102,4,2025-10-07
1615,5,103,1,2025-10-08

Creating a pipeline in an SDP project

The steps are as follows:

  1. Install the SDP framework locally
  2. Initialize an SDP template project using the spark-pipelines init command
  3. Create the necessary components for the pipeline

Installing the SDP framework locally

To install the components needed to test the SDP framework, use the following pip command: pip install pyspark[pipelines].

Due to some dependency compatibility issues in my work environment, I used the following command to install the necessary components with the corresponding compatible versions: pip install pyspark==4.1.1 pyspark[pipelines] protobuf==6.33.0

Initializing an SDP project

To initialize an SDP project, the easiest way is to navigate to the application directory and run the following command: spark-pipelines init --name LoadOrdersData.

This will create a directory named LoadOrdersData within the application directory, along with the following directory structure :

  • pipeline-storage/: Directory for storing checkpoints when using Spark Streaming
  • transformations/: Directory for storing all Python and SQL scripts defining the pipelines
  • spark-pipeline.yml: Pipeline configuration file

Creating the necessary components for the pipeline

The goal is to create the following pipeline :

graph TD;
    A(clients.csv) --load_bronze_orders.py--> D(sdp_bronze.raw_clients)
    B(products.csv) --load_bronze_orders.py--> E(sdp_bronze.raw_products)
    C(orders.csv) --load_bronze_orders.py--> F(sdp_bronze.raw_orders)
    D(sdp_bronze.raw_clients) --load_silver_orders.sql--> G(sdp_silver.refined_clients)
    E(sdp_bronze.raw_products) --load_silver_orders.sql--> H(sdp_silver.refined_products)
    F(sdp_bronze.raw_orders) --load_silver_orders.sql--> I(sdp_silver.refined_orders)
    H(sdp_silver.refined_products) --load_gold_orders.sql--> J(sdp_gold.dist_by_product)
    I(sdp_silver.refined_orders) --load_gold_orders.sql--> J(sdp_gold.dist_by_product)
    G(sdp_silver.refined_clients) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
    H(sdp_silver.refined_products) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)
    I(sdp_silver.refined_orders) --load_gold_orders.sql--> K(sdp_gold.dist_by_client)

Schema : schema_01

The steps are as follows:

  • Create the Python script load_bronze_orders.py to define the tables for storing data from the CSV files
  • Create the SQL script load_silver_orders.sql to refine the tables from the bronze zone into the silver zone
  • Create the SQL script load_gold_orders.sql to execute aggregation queries based on the tables from the silver zone

Content of the load_bronze_orders.py file :

 1from pyspark import pipelines as dp
 2from pyspark.sql import DataFrame, SparkSession
 3from pyspark.sql.functions import current_timestamp
 4
 5
 6PATH_DATA_INPUT="file:///opt/spark/data/input"
 7spark = SparkSession.active()
 8
 9
10# --- Bronze zone (raw loading) ---
11@dp.materialized_view(name="sdp_bronze.raw_orders")
12def raw_orders() -> DataFrame:
13    return spark.read \
14                .format("csv") \
15                .option("header",True) \
16                .option("inferSchema",True) \
17                .load(f"{PATH_DATA_INPUT}/orders.csv") \
18                .withColumn("ts_load_file", current_timestamp())
19
20@dp.materialized_view(name="sdp_bronze.raw_clients")
21def raw_clients() -> DataFrame:
22    return spark.read \
23                .format("csv") \
24                .option("header",True) \
25                .option("inferSchema",True) \
26                .load(f"{PATH_DATA_INPUT}/clients.csv") \
27                .withColumn("ts_load_file", current_timestamp())
28
29@dp.materialized_view(name="sdp_bronze.raw_products")
30def raw_products() -> DataFrame:
31    return spark.read \
32                .format("csv") \
33                .option("header",True) \
34                .option("inferSchema",True) \
35                .load(f"{PATH_DATA_INPUT}/products.csv") \
36                .withColumn("ts_load_file", current_timestamp())

Content of the load_silver_orders.sql file :

 1--- Silver Zone (cleaned & typed) ---
 2CREATE MATERIALIZED VIEW sdp_silver.refined_orders
 3AS
 4SELECT int(id_commande) 
 5    ,int(id_client) 
 6    ,int(id_product)
 7    ,int(nb_quantity)
 8    ,to_date(dt_commande) as dt_commande
 9    ,to_timestamp(ts_load_file) as ts_load_file
10FROM sdp_bronze.raw_orders 
11WHERE id_commande IS NOT NULL
12;
13
14
15CREATE MATERIALIZED VIEW sdp_silver.refined_clients
16AS
17SELECT int(id_client)
18    ,string(lib_client) 
19    ,string(lib_city)
20    ,to_timestamp(ts_load_file) as ts_load_file
21FROM sdp_bronze.raw_clients 
22WHERE id_client IS NOT NULL
23;
24
25
26CREATE MATERIALIZED VIEW sdp_silver.refined_products
27AS
28SELECT int(id_product)
29    ,string(lib_product)
30    ,double(mnt_prix_unit)
31    ,string(lib_category)
32    ,to_timestamp(ts_load_file) as ts_load_file
33FROM sdp_bronze.raw_products 
34WHERE id_product IS NOT NULL
35;

Content of the load_gold_orders.sql file :

 1--- Gold Zone (Business analyses) ---
 2CREATE MATERIALIZED VIEW sdp_gold.dist_by_client
 3AS
 4SELECT c.id_client, c.lib_client, ROUND(SUM(o.nb_quantity * p.mnt_prix_unit), 2) as total_expense
 5FROM sdp_silver.refined_orders o
 6INNER JOIN sdp_silver.refined_clients c 
 7ON (o.id_client = c.id_client)
 8INNER JOIN sdp_silver.refined_products p 
 9ON (o.id_product = p.id_product)
10GROUP BY c.id_client, c.lib_client
11ORDER BY total_expense DESC
12;
13
14CREATE MATERIALIZED VIEW sdp_gold.dist_by_product
15AS 
16SELECT p.id_product, p.lib_product, SUM(o.nb_quantity) as sales_volume
17FROM sdp_silver.refined_orders o
18INNER JOIN sdp_silver.refined_products p 
19ON (o.id_product = p.id_product)
20GROUP BY p.id_product, p.lib_product
21ORDER BY sales_volume DESC
22;

Creating elements to manage the environment

To properly manage the environment, we will create two PySpark scripts in the application/LoadOrdersData directory:

  • The init_schemas.py script will create the schemas needed to create the pipeline tables
    • sdp_bronze: For storing bronze tables
    • sdp_silver: For storing silver tables
    • sdp_gold: For storing gold tables
  • The check_list_tables.py script will displays all the created elements

Content of the init_schemas.py file :

 1from pyspark.sql import SparkSession
 2from pyspark.sql.functions import col
 3
 4REMOTE_URL = "sc://localhost:15002"
 5
 6spark = SparkSession.builder.remote(REMOTE_URL).getOrCreate()
 7
 8spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_bronze COMMENT 'Schema for SDP Pipelines - Bronze Zone'")
 9spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_silver COMMENT 'Schema for SDP Pipelines - Silver Zone'")
10spark.sql("CREATE SCHEMA IF NOT EXISTS sdp_gold COMMENT 'Schema for SDP Pipelines - Gold Zone'")
11
12list_schemas = spark.sql("show schemas;").where(col("namespace").like('sdp_%'))
13
14nb_sdp_schema = list_schemas.count()
15
16if (nb_sdp_schema == 3):
17    print(f"[INFO] The SDP Schemas have been successfully created !")
18else :
19    print(f"[WARN] The SDP Schemas were not created successfully !")
20
21list_schemas.show()
22
23spark.stop()

Content of the check_list_tables.py file :

 1from pyspark.sql import SparkSession
 2from pyspark.sql.functions import col
 3
 4REMOTE_URL = "sc://localhost:15002"
 5
 6spark = SparkSession.builder.remote(REMOTE_URL).getOrCreate()
 7
 8list_tables = ['clients','products','orders']
 9list_tables_gold = ['dist_by_client','dist_by_product']
10
11print(f"[INFO] Data from SDP_BRONZE :")
12for table in list_tables :
13    tmp_table = f"sdp_bronze.raw_{table}"
14    print(f"- {tmp_table} :")
15    spark.table(tmp_table).show(truncate=False)
16
17print(f"[INFO] Data from SDP_SILVER :")
18for table in list_tables :
19    tmp_table = f"sdp_silver.refined_{table}"
20    print(f"- {tmp_table} :")
21    spark.table(tmp_table).show(truncate=False)
22
23print(f"[INFO] Data from SDP_GOLD :")
24for table in list_tables_gold :
25    tmp_table = f"sdp_gold.{table}"
26    print(f"- {tmp_table} :")
27    spark.table(tmp_table).show(truncate=False)
28
29spark.stop()

Result of running the created pipeline

Pipeline Execution

All commands will be executed from the application/LoadOrdersData/ directory

The execution order is as follows :

  1. Run the schema initialization script using the command python init_schemas.py
  2. Verify that the message displayed by the init_schemas.py script is [INFO] The SDP Schemas have been successfully created !
  3. Run the pipeline using the command spark-pipelines run --spec spark-pipeline.yml
  4. Run the script to verify the created objects using the command python check_list_tables.py

Execution result

Output of running the command python init_schemas.py :

1[INFO] The SDP Schemas have been successfully created !
2+----------+
3| namespace|
4+----------+
5|sdp_bronze|
6|  sdp_gold|
7|sdp_silver|
8+----------+

Output of running the command spark-pipelines run --spec spark-pipeline.yml :

 12026-03-30 19:31:25: Loading pipeline spec from spark-pipeline.yml...
 22026-03-30 19:31:25: Creating Spark session...
 32026-03-30 19:31:25: Creating dataflow graph...
 42026-03-30 19:31:25: Registering graph elements...
 52026-03-30 19:31:25: Loading definitions. Root directory: '.../application/LoadOrdersData'.
 62026-03-30 19:31:25: Found 3 files matching glob 'transformations/**/*'
 72026-03-30 19:31:25: Importing .../application/LoadOrdersData/transformations/load_bronze_orders.py...
 82026-03-30 19:31:27: Registering SQL file .../application/LoadOrdersData/transformations/load_gold_orders.sql...
 92026-03-30 19:31:28: Registering SQL file .../application/LoadOrdersData/transformations/load_silver_orders.sql...
102026-03-30 19:31:28: Starting run...
112026-03-30 19:31:38: Flow spark_catalog.sdp_gold.dist_by_product is QUEUED.
122026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_orders is QUEUED.
132026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is QUEUED.
142026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_products is QUEUED.
152026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is QUEUED.
162026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is QUEUED.
172026-03-30 19:31:38: Flow spark_catalog.sdp_gold.dist_by_client is QUEUED.
182026-03-30 19:31:38: Flow spark_catalog.sdp_silver.refined_clients is QUEUED.
192026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is PLANNING.
202026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is STARTING.
212026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_clients is RUNNING.
222026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is PLANNING.
232026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is STARTING.
242026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_orders is RUNNING.
252026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is PLANNING.
262026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is STARTING.
272026-03-30 19:31:38: Flow spark_catalog.sdp_bronze.raw_products is RUNNING.
282026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_orders has COMPLETED.
292026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_products has COMPLETED.
302026-03-30 19:31:41: Flow spark_catalog.sdp_bronze.raw_clients has COMPLETED.
312026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is PLANNING.
322026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is STARTING.
332026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_orders is RUNNING.
342026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is PLANNING.
352026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is STARTING.
362026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_clients is RUNNING.
372026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is PLANNING.
382026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is STARTING.
392026-03-30 19:31:42: Flow spark_catalog.sdp_silver.refined_products is RUNNING.
402026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_clients has COMPLETED.
412026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_products has COMPLETED.
422026-03-30 19:31:58: Flow spark_catalog.sdp_silver.refined_orders has COMPLETED.
432026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is PLANNING.
442026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is STARTING.
452026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_product is RUNNING.
462026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is PLANNING.
472026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is STARTING.
482026-03-30 19:31:59: Flow spark_catalog.sdp_gold.dist_by_client is RUNNING.
492026-03-30 19:32:16: Flow spark_catalog.sdp_gold.dist_by_product has COMPLETED.
502026-03-30 19:32:16: Flow spark_catalog.sdp_gold.dist_by_client has COMPLETED.
512026-03-30 19:32:17: Run is COMPLETED.

Output of running the command python check_list_tables.py :

  1[INFO] Data from SDP_BRONZE :
  2- sdp_bronze.raw_clients :
  3+---------+----------+---------+--------------------------+
  4|id_client|lib_client|lib_city |ts_load_file              |
  5+---------+----------+---------+--------------------------+
  6|1        |Alice     |Paris    |2026-03-30 19:31:38.862824|
  7|2        |Bob       |Lyon     |2026-03-30 19:31:38.862824|
  8|3        |Charlie   |Marseille|2026-03-30 19:31:38.862824|
  9|4        |David     |Lille    |2026-03-30 19:31:38.862824|
 10|5        |Eve       |Bordeaux |2026-03-30 19:31:38.862824|
 11+---------+----------+---------+--------------------------+
 12
 13- sdp_bronze.raw_products :
 14+----------+-----------+-------------+------------+--------------------------+
 15|id_product|lib_product|mnt_prix_unit|lib_category|ts_load_file              |
 16+----------+-----------+-------------+------------+--------------------------+
 17|101       |Laptop     |1200.0       |Electronique|2026-03-30 19:31:38.859625|
 18|102       |Souris     |25.0         |Accessoires |2026-03-30 19:31:38.859625|
 19|103       |Clavier    |45.0         |Accessoires |2026-03-30 19:31:38.859625|
 20|104       |Ecran      |200.0        |Electronique|2026-03-30 19:31:38.859625|
 21|105       |Casque     |80.0         |Audio       |2026-03-30 19:31:38.859625|
 22+----------+-----------+-------------+------------+--------------------------+
 23
 24- sdp_bronze.raw_orders :
 25+-----------+---------+----------+-----------+-----------+--------------------------+
 26|id_commande|id_client|id_product|nb_quantity|dt_commande|ts_load_file              |
 27+-----------+---------+----------+-----------+-----------+--------------------------+
 28|1          |1        |101       |1          |2025-10-01 |2026-03-30 19:31:38.762104|
 29|2          |2        |102       |2          |2025-10-01 |2026-03-30 19:31:38.762104|
 30|3          |3        |103       |1          |2025-10-02 |2026-03-30 19:31:38.762104|
 31|4          |1        |104       |1          |2025-10-02 |2026-03-30 19:31:38.762104|
 32|5          |5        |105       |2          |2025-10-03 |2026-03-30 19:31:38.762104|
 33|6          |2        |101       |1          |2025-10-03 |2026-03-30 19:31:38.762104|
 34|7          |4        |103       |3          |2025-10-04 |2026-03-30 19:31:38.762104|
 35|8          |1        |102       |1          |2025-10-04 |2026-03-30 19:31:38.762104|
 36|9          |3        |101       |1          |2025-10-05 |2026-03-30 19:31:38.762104|
 37|10         |5        |104       |1          |2025-10-05 |2026-03-30 19:31:38.762104|
 38|11         |2        |105       |1          |2025-10-06 |2026-03-30 19:31:38.762104|
 39|12         |4        |101       |1          |2025-10-06 |2026-03-30 19:31:38.762104|
 40|13         |1        |105       |1          |2025-10-07 |2026-03-30 19:31:38.762104|
 41|14         |3        |102       |4          |2025-10-07 |2026-03-30 19:31:38.762104|
 42|15         |5        |103       |1          |2025-10-08 |2026-03-30 19:31:38.762104|
 43+-----------+---------+----------+-----------+-----------+--------------------------+
 44
 45[INFO] Data from SDP_SILVER :
 46- sdp_silver.refined_clients :
 47+---------+----------+---------+--------------------------+
 48|id_client|lib_client|lib_city |ts_load_file              |
 49+---------+----------+---------+--------------------------+
 50|1        |Alice     |Paris    |2026-03-30 19:31:38.862824|
 51|2        |Bob       |Lyon     |2026-03-30 19:31:38.862824|
 52|3        |Charlie   |Marseille|2026-03-30 19:31:38.862824|
 53|4        |David     |Lille    |2026-03-30 19:31:38.862824|
 54|5        |Eve       |Bordeaux |2026-03-30 19:31:38.862824|
 55+---------+----------+---------+--------------------------+
 56
 57- sdp_silver.refined_products :
 58+----------+-----------+-------------+------------+--------------------------+
 59|id_product|lib_product|mnt_prix_unit|lib_category|ts_load_file              |
 60+----------+-----------+-------------+------------+--------------------------+
 61|101       |Laptop     |1200.0       |Electronique|2026-03-30 19:31:38.859625|
 62|102       |Souris     |25.0         |Accessoires |2026-03-30 19:31:38.859625|
 63|103       |Clavier    |45.0         |Accessoires |2026-03-30 19:31:38.859625|
 64|104       |Ecran      |200.0        |Electronique|2026-03-30 19:31:38.859625|
 65|105       |Casque     |80.0         |Audio       |2026-03-30 19:31:38.859625|
 66+----------+-----------+-------------+------------+--------------------------+
 67
 68- sdp_silver.refined_orders :
 69+-----------+---------+----------+-----------+-----------+--------------------------+
 70|id_commande|id_client|id_product|nb_quantity|dt_commande|ts_load_file              |
 71+-----------+---------+----------+-----------+-----------+--------------------------+
 72|1          |1        |101       |1          |2025-10-01 |2026-03-30 19:31:38.762104|
 73|2          |2        |102       |2          |2025-10-01 |2026-03-30 19:31:38.762104|
 74|3          |3        |103       |1          |2025-10-02 |2026-03-30 19:31:38.762104|
 75|4          |1        |104       |1          |2025-10-02 |2026-03-30 19:31:38.762104|
 76|5          |5        |105       |2          |2025-10-03 |2026-03-30 19:31:38.762104|
 77|6          |2        |101       |1          |2025-10-03 |2026-03-30 19:31:38.762104|
 78|7          |4        |103       |3          |2025-10-04 |2026-03-30 19:31:38.762104|
 79|8          |1        |102       |1          |2025-10-04 |2026-03-30 19:31:38.762104|
 80|9          |3        |101       |1          |2025-10-05 |2026-03-30 19:31:38.762104|
 81|10         |5        |104       |1          |2025-10-05 |2026-03-30 19:31:38.762104|
 82|11         |2        |105       |1          |2025-10-06 |2026-03-30 19:31:38.762104|
 83|12         |4        |101       |1          |2025-10-06 |2026-03-30 19:31:38.762104|
 84|13         |1        |105       |1          |2025-10-07 |2026-03-30 19:31:38.762104|
 85|14         |3        |102       |4          |2025-10-07 |2026-03-30 19:31:38.762104|
 86|15         |5        |103       |1          |2025-10-08 |2026-03-30 19:31:38.762104|
 87+-----------+---------+----------+-----------+-----------+--------------------------+
 88
 89[INFO] Data from SDP_GOLD :
 90- sdp_gold.dist_by_client :
 91+---------+----------+-------------+
 92|id_client|lib_client|total_expense|
 93+---------+----------+-------------+
 94|1        |Alice     |1505.0       |
 95|3        |Charlie   |1345.0       |
 96|4        |David     |1335.0       |
 97|2        |Bob       |1330.0       |
 98|5        |Eve       |405.0        |
 99+---------+----------+-------------+
100
101- sdp_gold.dist_by_product :
102+----------+-----------+------------+
103|id_product|lib_product|sales_volume|
104+----------+-----------+------------+
105|102       |Souris     |7           |
106|103       |Clavier    |5           |
107|105       |Casque     |4           |
108|101       |Laptop     |4           |
109|104       |Ecran      |2           |
110+----------+-----------+------------+