Spark : v4.x - Features - SQL Scripts, Session and ANSI compliance

You’ll find in this article, some informations about the SQL features (Script, Session, Compliance) from Spark v4.x.

ANSI SQL Compliance

Introduction

ANSI SQL compliance mode enforces stricter SQL semantics matching ISO/IEC 9075 standard. Enables type safety, prevents implicit casting errors, enforces reserved keyword restrictions.

Detail

Spark’s default SQL behavior prioritizes ease of use over strict correctness. Implicit type conversions silently succeed even when semantically wrong. Dividing integers returns integers (truncation). Overflow operations wrap around without errors. Reserved keywords can be used as column names without quoting.

ANSI SQL compliance fundamentally changes these behaviors to match traditional databases like PostgreSQL, SQL Server and more. The changes affect three primary areas:

  • Type System Strictness : Queries fail instead of returning truncated or null results silently
  • NULL Handling : ANSI mode enforces stricter NULL semantics in predicates and set operations. NULL = NULL always returns NULL (not true). UNION, INTERSECT, and EXCEPT treat NULL values as equal for deduplication (matching the SQL standard).
  • Reserved Keywords : ANSI mode enforces SQL reserved keyword restrictions. Column names like SELECT, FROM, WHERE must be backtick-quoted. This matches traditional databases behavior but breaks Spark SQL queries that use these names freely.

The implementation affects multiple Spark subsystems:

  • Catalyst Optimizer: Adds type validation rules during the analysis phase. Rejects queries with type mismatches earlier in planning.
  • Code Generation: Changes generated code for arithmetic operations to include overflow checks. Adds branch instructions that check for overflow conditions before returning results.
  • Runtime Execution: Wraps operations in exception handlers that fail tasks on ANSI violations. This changes error handling from silent NULL propagation to explicit failures.

The mode is configured per-session via spark.sql.ansi.enabled. Once enabled, all queries in that session follow ANSI rules.

Advices

  • spark.sql.ansi.enabled = true :
    • Master switch for ANSI compliance. Enables all strict behaviors: overflow checking, type safety, division semantics.
    • Must be set before creating the session for consistent behavior.
    • Cannot be changed mid-session in some Spark deployment modes.
  • TRY_CAST vs CAST in ANSI Mode :
    • CAST() throws exceptions on conversion errors in ANSI mode. TRY_CAST() returns NULL but explicitly signals validation failure.
    • Use TRY_CAST() for data validation pipelines where bad rows should be filtered/logged rather than failing the entire job.
  • typeof() Function :
    • Returns the data type of an expression result. Shows whether result is INT, DECIMAL, DOUBLE, etc. Use in migration validation to identify type changes.
  • Reserved Keyword Quoting
    • ANSI mode enforces backtick quoting for reserved words. Queries with column names like user, timestamp, order must be rewritten. This is a breaking change for large SQL codebases.
    • Run static analysis to identify affected queries before migration.

Advantages

  1. Improved Type Safety : Strict type checking catches errors at query compilation instead of runtime. Schema evolution issues appear earlier. Prevents data quality issues from propagating through data pipelines.
  2. Fail-Fast Error Detection : Type mismatches and arithmetic errors fail immediately instead of producing silent incorrect results. A pipeline that processes financial transactions won’t silently corrupt data on overflow. Errors surface during testing rather than in production.
  3. Database Migration Compatibility : Queries that worked correctly in the source database behave identically in Spark. (Reduces migration risk and testing burden)
  4. Regulatory Compliance : Financial services and healthcare industries require deterministic, auditable query behavior. ANSI mode provides predictable semantics that match regulatory expectations.

Limitations

  1. Breaking Changes to Existing Queries : Queries that relied on Spark’s lenient behavior will fail. CAST operations that previously returned NULL now throw exceptions. Integer division results change from 0 to 0.5. Large codebases require extensive testing and refactoring.
  2. Performance Overhead : Each addition, multiplication, and division includes conditional branches to check for overflow. Processing involving extensive calculations on large volumes of data may experience a decline in performance.
  3. All-or-Nothing Configuration : Cannot enable ANSI mode for specific queries or tables. The session-wide setting affects all operations. Gradual migration requires running two separate Spark sessions with different configurations.
  4. Error Messages Can Fail Entire Jobs : A single row with a type conversion error fails the entire task. In default mode, bad rows become NULL and processing continues. ANSI mode lacks row-level error recovery. Requires pre-validation or a separate error handling pipeline.

Real-World Use Cases

  • Use Case 1: PostgreSQL to Spark Migration
    • A company migrates 500+ SQL queries from PostgreSQL data warehouse to Spark lakehouse. Queries contain integer division, overflow-prone calculations, and strict type dependencies. ANSI mode enables running queries unmodified. Without ANSI mode, a significant percentage of queries may need to be rewritten to match PostgreSQL semantics.
  • Use Case 2: Financial Calculation Accuracy
    • A financial services firm calculates interest accruals on loan portfolios. Default Spark integer division principal * rate / 365 would truncate to zero for small daily rates. ANSI mode enforces decimal division producing accurate results. Prevents multi-million dollar calculation errors.
  • Use Case 3: Data Quality Validation Pipeline
    • An ETL pipeline validates incoming data against strict type schemas. Records with invalid type conversions must be rejected and not silently converted to NULL. ANSI mode fails tasks on first bad record, triggering alerts. Teams investigate data quality issues immediately instead of discovering corrupted data downstream.

Codes

Example : Type Safety and Arithmetic Behavior

This example demonstrates how ANSI mode changes type conversion and arithmetic operations. Shows breaking changes from default behavior.

Spark : Default Mode

  1# PySpark Example - Not ANSI SQL Type Safety and Arithmetic
  2from pyspark.sql import SparkSession
  3from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType
  4
  5URL_MASTER = "spark://spark-master:7077"
  6
  7
  8spark = SparkSession.builder \
  9    .appName("SPARK_DefaultSQLMode") \
 10    .config("spark.sql.ansi.enabled", "false") \
 11    .master(URL_MASTER) \
 12    .getOrCreate()
 13
 14
 15print("""
 16##########################
 17##### Build Datasets #####
 18##########################
 19""")
 20
 21# Source system 1: Financial dataset
 22# Demonstrates real-world scenarios where ANSI mode prevents errors
 23financial_data = [
 24    ("LOAN001", 10000, 5.25, 365, "2024-01-15"),    # Standard loan
 25    ("LOAN002", 50000, 4.75, 365, "2024-01-15"),    # Large principal
 26    ("LOAN003", 1500, 12.50, 365, "2024-01-15"),    # High interest rate
 27    ("LOAN004", 2147483647, 3.00, 365, "2024-01-15"), # Near integer max (overflow risk)
 28    ("LOAN005", 25000, 6.00, 365, "2024-01-15"),    # Medium loan
 29    ("LOAN006", 8000, 0.00, 365, "2024-01-15"),     # Zero interest (division edge case)
 30    ("LOAN007", 100000, 7.25, 365, "2024-01-15"),   # Large loan
 31    ("LOAN008", 500, 15.00, 365, "2024-01-15"),     # Small principal, high rate
 32    ("LOAN009", 75000, 4.25, 365, "2024-01-15"),    # Standard
 33    ("LOAN010", 1000000, 3.50, 365, "2024-01-15"),  # Very large principal (overflow in calculations)
 34]
 35
 36# Source system 2: Problematic Financial dataset
 37# Test dataset with intentionally problematic values for type conversion
 38problematic_data = [
 39    ("CONV001", "12345", "2024-01-15"),   # Valid integer string
 40    ("CONV002", "67890", "2024-01-15"),   # Valid integer string
 41    ("CONV003", "abc123", "2024-01-15"),  # Invalid integer (contains letters)
 42    ("CONV004", "99999", "2024-01-15"),   # Valid integer
 43    ("CONV005", "12.34", "2024-01-15"),   # Decimal in integer field
 44    ("CONV006", "", "2024-01-15"),        # Empty string
 45    ("CONV007", "55555", "2024-01-15"),   # Valid integer
 46    ("CONV008", "1e5", "2024-01-15"),     # Scientific notation
 47    ("CONV009", "42", "2024-01-15"),      # Valid integer
 48    ("CONV010", "NULL", "2024-01-15"),    # String "NULL" not actual NULL
 49]
 50
 51financial_schema = StructType([
 52    StructField("loan_id",  StringType()),
 53    StructField("principal", IntegerType()),
 54    StructField("annual_rate", DoubleType()),  
 55    StructField("days",   IntegerType()),
 56    StructField("date", StringType()),
 57])
 58
 59problematic_schema = StructType([
 60    StructField("record_id", StringType()),
 61    StructField("value_str", StringType()),
 62    StructField("date", StringType()),
 63])
 64
 65
 66# Create DataFrame in both sessions
 67loans_data = spark.createDataFrame(financial_data,financial_schema)
 68print(f"financial_data dataset: {loans_data.count()}")  
 69loans_data.show(truncate=False)
 70
 71
 72problems_data = spark.createDataFrame(problematic_data,problematic_schema)
 73print(f"problematic_data dataset: {problems_data.count()}")  
 74problems_data.show(truncate=False)
 75
 76
 77# Register DataFrames as temporary views for SQL access
 78loans_data.createOrReplaceTempView("loans_data")
 79problems_data.createOrReplaceTempView("problems_data")
 80
 81
 82print("""
 83########################################
 84##### 1. Integer Overflow Behavior #####
 85########################################
 86""")
 87
 88
 89overflow = spark.sql("""
 90    SELECT 
 91        loan_id,
 92        principal,
 93        principal + 1000000 as principal_plus_1m,
 94        CASE 
 95            WHEN principal + 1000000 < 0 THEN 'OVERFLOW DETECTED'
 96            ELSE 'OK'
 97        END as overflow_check
 98    FROM loans_data
 99    WHERE loan_id IN ('LOAN004', 'LOAN010')
100""")
101
102# LOAN004 (at INT max) wrapped to negative value
103overflow.show(truncate=False)
104
105
106
107print("""
108####################################
109##### 2. Type Casting Behavior #####
110####################################
111""")
112
113
114cast = spark.sql("""
115    SELECT 
116        record_id,
117        value_str,
118        CAST(value_str AS INT) as value_int,
119        CASE 
120            WHEN CAST(value_str AS INT) IS NULL THEN 'CAST FAILED'
121            ELSE 'CAST OK'
122        END as cast_status
123    FROM problems_data
124    ORDER BY record_id
125""")
126
127# Invalid values became NULL without any error indication
128cast.show(truncate=False)
129
130
131spark.stop()

Result : Default Mode

 1##########################
 2##### Build Datasets #####
 3##########################
 4
 5financial_data dataset: 10
 6+-------+----------+-----------+----+----------+
 7|loan_id|principal |annual_rate|days|date      |
 8+-------+----------+-----------+----+----------+
 9|LOAN001|10000     |5.25       |365 |2024-01-15|
10|LOAN002|50000     |4.75       |365 |2024-01-15|
11|LOAN003|1500      |12.5       |365 |2024-01-15|
12|LOAN004|2147483647|3.0        |365 |2024-01-15|
13|LOAN005|25000     |6.0        |365 |2024-01-15|
14|LOAN006|8000      |0.0        |365 |2024-01-15|
15|LOAN007|100000    |7.25       |365 |2024-01-15|
16|LOAN008|500       |15.0       |365 |2024-01-15|
17|LOAN009|75000     |4.25       |365 |2024-01-15|
18|LOAN010|1000000   |3.5        |365 |2024-01-15|
19+-------+----------+-----------+----+----------+
20
21problematic_data dataset: 10
22+---------+---------+----------+
23|record_id|value_str|date      |
24+---------+---------+----------+
25|CONV001  |12345    |2024-01-15|
26|CONV002  |67890    |2024-01-15|
27|CONV003  |abc123   |2024-01-15|
28|CONV004  |99999    |2024-01-15|
29|CONV005  |12.34    |2024-01-15|
30|CONV006  |         |2024-01-15|
31|CONV007  |55555    |2024-01-15|
32|CONV008  |1e5      |2024-01-15|
33|CONV009  |42       |2024-01-15|
34|CONV010  |NULL     |2024-01-15|
35+---------+---------+----------+
36
37
38##############################################################
39##### 1. Calculate daily interest using integer division #####
40##############################################################
41
42+-------+----------+-----------+------------------+-----------+
43|loan_id|principal |annual_rate|daily_interest    |result_type|
44+-------+----------+-----------+------------------+-----------+
45|LOAN001|10000     |5.25       |1.4383561643835616|double     |
46|LOAN002|50000     |4.75       |6.506849315068493 |double     |
47|LOAN003|1500      |12.5       |0.5136986301369864|double     |
48|LOAN004|2147483647|3.0        |176505.5052328767 |double     |
49|LOAN005|25000     |6.0        |4.109589041095891 |double     |
50|LOAN006|8000      |0.0        |0.0               |double     |
51|LOAN007|100000    |7.25       |19.863013698630137|double     |
52|LOAN008|500       |15.0       |0.2054794520547945|double     |
53|LOAN009|75000     |4.25       |8.732876712328768 |double     |
54|LOAN010|1000000   |3.5        |95.89041095890411 |double     |
55+-------+----------+-----------+------------------+-----------+
56
57
58########################################
59##### 2. Integer Overflow Behavior #####
60########################################
61
62+-------+----------+-----------------+-----------------+
63|loan_id|principal |principal_plus_1m|overflow_check   |
64+-------+----------+-----------------+-----------------+
65|LOAN004|2147483647|-2146483649      |OVERFLOW DETECTED|
66|LOAN010|1000000   |2000000          |OK               |
67+-------+----------+-----------------+-----------------+
68
69
70####################################
71##### 3. Type Casting Behavior #####
72####################################
73
74+---------+---------+---------+-----------+
75|record_id|value_str|value_int|cast_status|
76+---------+---------+---------+-----------+
77|CONV001  |12345    |12345    |CAST OK    |
78|CONV002  |67890    |67890    |CAST OK    |
79|CONV003  |abc123   |NULL     |CAST FAILED|
80|CONV004  |99999    |99999    |CAST OK    |
81|CONV005  |12.34    |12       |CAST OK    |
82|CONV006  |         |NULL     |CAST FAILED|
83|CONV007  |55555    |55555    |CAST OK    |
84|CONV008  |1e5      |NULL     |CAST FAILED|
85|CONV009  |42       |42       |CAST OK    |
86|CONV010  |NULL     |NULL     |CAST FAILED|
87+---------+---------+---------+-----------+

Spark : ANSI Mode

  1# PySpark Example - ANSI SQL Type Safety and Arithmetic
  2from pyspark.sql import SparkSession
  3from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType
  4
  5URL_MASTER = "spark://spark-master:7077"
  6
  7
  8spark = SparkSession.builder \
  9    .appName("SPARK_ANSISQLMode") \
 10    .config("spark.sql.ansi.enabled", "true") \
 11    .master(URL_MASTER) \
 12    .getOrCreate()
 13
 14
 15print("""
 16##########################
 17##### Build Datasets #####
 18##########################
 19""")
 20
 21# Source system 1: Financial dataset
 22# Demonstrates real-world scenarios where ANSI mode prevents errors
 23financial_data = [
 24    ("LOAN001", 10000, 5.25, 365, "2024-01-15"),    # Standard loan
 25    ("LOAN002", 50000, 4.75, 365, "2024-01-15"),    # Large principal
 26    ("LOAN003", 1500, 12.50, 365, "2024-01-15"),    # High interest rate
 27    ("LOAN004", 2147483647, 3.00, 365, "2024-01-15"), # Near integer max (overflow risk)
 28    ("LOAN005", 25000, 6.00, 365, "2024-01-15"),    # Medium loan
 29    ("LOAN006", 8000, 0.00, 365, "2024-01-15"),     # Zero interest (division edge case)
 30    ("LOAN007", 100000, 7.25, 365, "2024-01-15"),   # Large loan
 31    ("LOAN008", 500, 15.00, 365, "2024-01-15"),     # Small principal, high rate
 32    ("LOAN009", 75000, 4.25, 365, "2024-01-15"),    # Standard
 33    ("LOAN010", 1000000, 3.50, 365, "2024-01-15"),  # Very large principal (overflow in calculations)
 34]
 35
 36# Source system 2: Problematic Financial dataset
 37# Test dataset with intentionally problematic values for type conversion
 38problematic_data = [
 39    ("CONV001", "12345", "2024-01-15"),   # Valid integer string
 40    ("CONV002", "67890", "2024-01-15"),   # Valid integer string
 41    ("CONV003", "abc123", "2024-01-15"),  # Invalid integer (contains letters)
 42    ("CONV004", "99999", "2024-01-15"),   # Valid integer
 43    ("CONV005", "12.34", "2024-01-15"),   # Decimal in integer field
 44    ("CONV006", "", "2024-01-15"),        # Empty string
 45    ("CONV007", "55555", "2024-01-15"),   # Valid integer
 46    ("CONV008", "1e5", "2024-01-15"),     # Scientific notation
 47    ("CONV009", "42", "2024-01-15"),      # Valid integer
 48    ("CONV010", "NULL", "2024-01-15"),    # String "NULL" not actual NULL
 49]
 50
 51financial_schema = StructType([
 52    StructField("loan_id",  StringType()),
 53    StructField("principal", IntegerType()),
 54    StructField("annual_rate", DoubleType()),  
 55    StructField("days",   IntegerType()),
 56    StructField("date", StringType()),
 57])
 58
 59problematic_schema = StructType([
 60    StructField("record_id", StringType()),
 61    StructField("value_str", StringType()),
 62    StructField("date", StringType()),
 63])
 64
 65
 66# Create DataFrame in both sessions
 67loans_data = spark.createDataFrame(financial_data,financial_schema)
 68print(f"financial_data dataset: {loans_data.count()}")  
 69loans_data.show(truncate=False)
 70
 71
 72problems_data = spark.createDataFrame(problematic_data,problematic_schema)
 73print(f"problematic_data dataset: {problems_data.count()}")  
 74problems_data.show(truncate=False)
 75
 76
 77# Register DataFrames as temporary views for SQL access
 78loans_data.createOrReplaceTempView("loans_data")
 79problems_data.createOrReplaceTempView("problems_data")
 80
 81
 82print("""
 83########################################
 84##### 1. Integer Overflow Behavior #####
 85########################################
 86""")
 87
 88
 89try:
 90    overflow = spark.sql("""
 91        SELECT 
 92            loan_id,
 93            principal,
 94            principal + 1000000 as principal_plus_1m
 95        FROM loans_data
 96        WHERE loan_id IN ('LOAN004', 'LOAN010')
 97    """)
 98    overflow.show(truncate=False)
 99    print("Query completed (no overflow detected)")
100except Exception as e:
101    print(f"Exception Caught: {type(e).__name__}")
102    print(f"Message: {str(e)[:200]}")
103    print("\nThis is EXPECTED in ANSI mode - overflow is an error, not silent corruption")
104
105
106
107print("""
108####################################
109##### 2. Type Casting Behavior #####
110####################################
111""")
112
113try:
114    cast = spark.sql("""
115        SELECT 
116            record_id,
117            value_str,
118            CAST(value_str AS INT) as value_int
119        FROM problems_data
120        ORDER BY record_id
121    """)
122    cast.show(truncate=False)
123except Exception as e:
124    print(f"Exception Caught: {type(e).__name__}")
125    print(f"Failed on record: {str(e)[str(e).find('CONV'):str(e).find('CONV')+10] if 'CONV' in str(e) else 'unknown'}")
126    print("\nThis is EXPECTED in ANSI mode - forces explicit error handling")
127
128
129print("\n##### with TRY_CAST: Safe Error Handling #####")
130try_cast = spark.sql("""
131    SELECT 
132        record_id,
133        value_str,
134        TRY_CAST(value_str AS INT) as value_int,
135        CASE 
136            WHEN TRY_CAST(value_str AS INT) IS NULL THEN 'CONVERSION_ERROR'
137            ELSE 'OK'
138        END as validation_status
139    FROM problems_data
140    ORDER BY record_id
141""")
142# TRY_CAST provides NULL on error but explicitly marks rows as problematic
143try_cast.show(truncate=False)
144
145
146spark.stop()

Result : ANSI Mode

 1##########################
 2##### Build Datasets #####
 3##########################
 4
 5financial_data dataset: 10
 6+-------+----------+-----------+----+----------+
 7|loan_id|principal |annual_rate|days|date      |
 8+-------+----------+-----------+----+----------+
 9|LOAN001|10000     |5.25       |365 |2024-01-15|
10|LOAN002|50000     |4.75       |365 |2024-01-15|
11|LOAN003|1500      |12.5       |365 |2024-01-15|
12|LOAN004|2147483647|3.0        |365 |2024-01-15|
13|LOAN005|25000     |6.0        |365 |2024-01-15|
14|LOAN006|8000      |0.0        |365 |2024-01-15|
15|LOAN007|100000    |7.25       |365 |2024-01-15|
16|LOAN008|500       |15.0       |365 |2024-01-15|
17|LOAN009|75000     |4.25       |365 |2024-01-15|
18|LOAN010|1000000   |3.5        |365 |2024-01-15|
19+-------+----------+-----------+----+----------+
20
21problematic_data dataset: 10
22+---------+---------+----------+
23|record_id|value_str|date      |
24+---------+---------+----------+
25|CONV001  |12345    |2024-01-15|
26|CONV002  |67890    |2024-01-15|
27|CONV003  |abc123   |2024-01-15|
28|CONV004  |99999    |2024-01-15|
29|CONV005  |12.34    |2024-01-15|
30|CONV006  |         |2024-01-15|
31|CONV007  |55555    |2024-01-15|
32|CONV008  |1e5      |2024-01-15|
33|CONV009  |42       |2024-01-15|
34|CONV010  |NULL     |2024-01-15|
35+---------+---------+----------+
36
37
38########################################
39##### 1. Integer Overflow Behavior #####
40########################################
41
42== SQL (line 5, position 13) ==
43            principal + 1000000 as principal_plus_1m
44            ^^^^^^^^^^^^^^^^^^^
45
46	at org.apache.spark.sql.errors.ExecutionErrors.arithmeticOverflowError(ExecutionErrors.scala:132)
47	...
48	at java.base/java.lang.Thread.run(Thread.java:1583)
49
50
51Exception Caught: ArithmeticException
52Message: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_add' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22003
53== SQL (l
54
55This is EXPECTED in ANSI mode - overflow is an error, not silent corruption
56
57####################################
58##### 2. Type Casting Behavior #####
59####################################
60
61== SQL (line 5, position 13) ==
62            CAST(value_str AS INT) as value_int
63            ^^^^^^^^^^^^^^^^^^^^^^
64
65	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:147)
66	...
67	at java.base/java.lang.Thread.run(Thread.java:1583)
68
69Exception Caught: NumberFormatException
70Failed on record: unknown
71
72This is EXPECTED in ANSI mode - forces explicit error handling
73
74##### with TRY_CAST: Safe Error Handling #####
75
76+---------+---------+---------+-----------------+
77|record_id|value_str|value_int|validation_status|
78+---------+---------+---------+-----------------+
79|CONV001  |12345    |12345    |OK               |
80|CONV002  |67890    |67890    |OK               |
81|CONV003  |abc123   |NULL     |CONVERSION_ERROR |
82|CONV004  |99999    |99999    |OK               |
83|CONV005  |12.34    |NULL     |CONVERSION_ERROR |
84|CONV006  |         |NULL     |CONVERSION_ERROR |
85|CONV007  |55555    |55555    |OK               |
86|CONV008  |1e5      |NULL     |CONVERSION_ERROR |
87|CONV009  |42       |42       |OK               |
88|CONV010  |NULL     |NULL     |CONVERSION_ERROR |
89+---------+---------+---------+-----------------+

Session SQL Variables

Introduction

Session SQL variables enable dynamic parameterization of queries without string concatenation or external configuration files. (Variables are session-scoped and type-safe. )

Detail

Before Spark 4.0, parameterizing SQL queries required three problematic approaches:

  1. String Interpolation in Application Code
    • Risk: SQL injection if country comes from user input. Requires recompiling query plan for each parameter value.
    • Example :
1country = "US"
2df = spark.sql(f"SELECT * FROM orders WHERE country = '{country}'")
  1. Spark Configuration Properties
    • Verbose syntax. Limited to string types. Configuration namespace pollution.
    • Example :
1spark.conf.set("my.param.pays", "US")
2spark.sql("SELECT * FROM orders WHERE country = '${spark.my.param.pays}'")
  1. External Configuration Files
    • Adds dependency on external systems. Configuration drift between environments. No type safety.
    • Example :
1params = yaml.load("config.yaml")
2spark.sql(f"SELECT * FROM orders WHERE country = '{params['country']}'")

Spark 4.0 introduces SQL variables that solve these problems. Variables are declared with explicit types and scoped to the session:

1DECLARE min_amount DECIMAL(10,2) DEFAULT 100.0;
2SELECT * FROM orders WHERE amount >= min_amount;

The two supported syntaxes :

  1. DECLARE Statement (SQL Standard)
    • Explicit type declaration. Required in ANSI SQL mode.
    • Example :
1DECLARE variable_name TYPE [DEFAULT value];
  1. SET Statement (Backwards Compatible)
    • Type inferred from value. Compatible with Spark 3.x session property syntax. Variables are type-safe once set.
    • Example :
1SET variable_name = value;

The reference is resolved during query compilation, not runtime. This enables Catalyst optimizer to use variable values for predicate pushdown and partition pruning.

The variable system maintains a session-level symbol table. Each variable has:

  • Name: Identifier used in queries
  • Type: Scalar Data type (Int, String, Decimal, Boolean, Date, Timestamp)
  • Value: Current value (mutable with subsequent SET)
  • Scope: Session-only (not visible to other sessions)

Variables integrate with query planning. When Catalyst analyzes a query with ${variable_name}, it:

  1. Looks up variable in session symbol table
  2. Replaces reference with literal value
  3. Proceeds with optimization using concrete value

Runtime performance is identical to hardcoded literals.

Advantages

  1. Type-Safe Parameterization : Variables have explicit types checked at declaration. Assigning wrong type produces compilation error. Prevents runtime type errors from misconfigured parameters.
  2. SQL Injection Prevention : Variable references are resolved during query compilation, not runtime string substitution. Malicious input in variable values cannot alter query structure.
  3. Simplified Multi-Environment Configuration : Same SQL script runs in dev/rec/prod with different variable values. No code changes between environments. Variables can be set from command-line arguments or environment variables at session startup.
  4. Optimizer-Aware Parameterization : Catalyst optimizer sees variable values during planning. Enables partition pruning and predicate pushdown based on variable values.

Limitations

  1. Session-Scoped Only : Variables don’t persist after session termination. No global variables shared across concurrent sessions. Each client must redeclare variables. Unsuitable for cluster-wide configuration that needs consistency across jobs.
  2. No Complex Types : Variables support only scalar types. Cannot declare ARRAY, STRUCT, or MAP variables. Passing complex parameter values still requires configuration properties or external storage.
  3. No Variable Persistence APIs : Variables cannot be saved to metastore or external storage. No built-in mechanism to load variables from YAML/JSON files. Teams must build custom variable management on top of base SQL variables.

Real-World Use Cases

  • Use Case 1 : Parameterized Reports
    • Define a date range at the beginning of the script and use it in multiple queries without repetition.
  • Use Case 2 : Environment-Specific Data Processing
    • An ETL pipeline runs in dev, staging, and production environments. Each environment reads from different database prefixes (dev_db.orders, prod_db.orders). Variables source_database and target_database are set from environment variables at job startup. Same SQL script works across all environments without modification.

Codes

Example : How SQL variables session works

Spark

  1# PySpark Example : Demonstrates how variables works
  2from pyspark.sql import SparkSession
  3from pyspark.sql.types import DoubleType, StringType, StructField, StructType
  4from pyspark.sql.functions import to_timestamp
  5
  6URL_MASTER = "spark://spark-master:7077"
  7
  8
  9spark = SparkSession.builder \
 10    .appName("SPARK_APPVARIABLE") \
 11    .config("spark.sql.ansi.enabled", "true") \
 12    .master(URL_MASTER) \
 13    .getOrCreate()
 14
 15
 16print("""
 17##########################
 18##### Build Datasets #####
 19##########################
 20""")
 21
 22# Source system 1: Sensor dataset
 23# Demonstrates real-world scenarios where ANSI mode prevents errors
 24sensor_readings = [
 25    ("SENSOR_01", "2024-01-15 10:00:00", 22.5, 45.2, "warehouse_A", "normal"),
 26    ("SENSOR_02", "2024-01-15 10:05:00", 35.8, 78.5, "warehouse_A", "normal"),  # High temp
 27    ("SENSOR_03", "2024-01-15 10:10:00", 18.2, 42.1, "warehouse_B", "normal"),
 28    ("SENSOR_01", "2024-01-15 10:15:00", 23.1, 46.8, "warehouse_A", "normal"),
 29    ("SENSOR_02", "2024-01-15 10:20:00", 38.5, 82.3, "warehouse_A", "alert"),   # Anomaly
 30    ("SENSOR_04", "2024-01-15 10:25:00", 15.2, 38.5, "warehouse_C", "normal"),  # Low temp
 31    ("SENSOR_03", "2024-01-15 10:30:00", 19.1, 43.7, "warehouse_B", "normal"),
 32    ("SENSOR_01", "2024-01-15 10:35:00", 24.2, 47.5, "warehouse_A", "normal"),
 33    ("SENSOR_04", "2024-01-15 10:40:00", 12.8, 35.2, "warehouse_C", "alert"),   # Anomaly
 34    ("SENSOR_02", "2024-01-15 10:45:00", 32.1, 72.5, "warehouse_A", "normal"),
 35]
 36
 37
 38sensor_schema = StructType([
 39    StructField("sensor_id",  StringType()),
 40    StructField("cur_timestamp", StringType()),
 41    StructField("temperature", DoubleType()),  
 42    StructField("humidity",   DoubleType()),
 43    StructField("location", StringType()),
 44    StructField("status", StringType()),
 45])
 46
 47
 48# Create DataFrame
 49sensors_df = spark.createDataFrame(sensor_readings,sensor_schema).withColumn("cur_timestamp",to_timestamp("cur_timestamp", format="yyyy-MM-dd HH:mm:ss"))
 50print(f"sensor dataset: {sensors_df.count()}")  
 51sensors_df.show(truncate=False)
 52
 53# Register DataFrames as temporary views for SQL access
 54sensors_df.createOrReplaceTempView("sensors_data")
 55
 56
 57
 58print("""
 59########################################
 60##### 1. Time-Based Variable Usage #####
 61########################################
 62""")
 63
 64spark.sql("DECLARE VARIABLE analysis_start TIMESTAMP DEFAULT TIMESTAMP'2024-01-15 10:00:00'")
 65spark.sql("DECLARE VARIABLE analysis_end TIMESTAMP DEFAULT TIMESTAMP'2024-01-15 10:15:00'")
 66
 67query_time_based = """
 68    SELECT 
 69        sensor_id,
 70        location,
 71        COUNT(*) as readings_in_window,
 72        AVG(temperature) as avg_temp,
 73        MAX(temperature) - MIN(temperature) as temp_range,
 74        analysis_start as window_start,
 75        analysis_end as window_end
 76    FROM sensors_data
 77    WHERE cur_timestamp BETWEEN analysis_start AND analysis_end
 78    GROUP BY sensor_id, location
 79    ORDER BY sensor_id, location;
 80"""
 81
 82print("EXECUTION n°1 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:15:00")
 83time_based = spark.sql(query_time_based)
 84print(f"Result records: {time_based.count()}")  
 85time_based.show(truncate=False)
 86
 87
 88print("EXECUTION n°2 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:30:00")
 89spark.sql("SET VARIABLE analysis_end = TIMESTAMP'2024-01-15 10:30:00'")
 90time_based = spark.sql(query_time_based)
 91print(f"Result records: {time_based.count()}")  
 92time_based.show(truncate=False)
 93
 94
 95
 96print("""
 97################################################
 98##### 2. Variables vs String Interpolation #####
 99################################################
100""")
101spark.sql("DECLARE VARIABLE target_location STRING DEFAULT 'warehouse_A'")
102
103print("UNSAFE: String interpolation (vulnerable to SQL injection)")
104def get_user_input() -> str : 
105    return "warehouse_A' OR '1'='1"
106
107target_location = get_user_input() 
108unsafe_interpolation = spark.sql(f"SELECT * FROM sensors_data WHERE location = '{target_location}'")
109print(f"Result records: {unsafe_interpolation.count()}")  
110unsafe_interpolation.show(truncate=False)
111
112
113print("SAFE: Variable substitution (injection-proof)")
114#Malicious input becomes a string value, not SQL code
115spark.sql("SET VARIABLE target_location = \"warehouse_A' OR '1'='1\"")
116safe_interpolation = spark.sql("SELECT * FROM sensors_data WHERE location = target_location")
117print(f"Result records: {safe_interpolation.count()}")  
118safe_interpolation.show(truncate=False)
119# Variable value is treated as string literal "US' OR '1'='1"
120
121
122
123spark.stop()

Result

 1##########################
 2##### Build Datasets #####
 3##########################
 4
 5sensor dataset: 10
 6+---------+-------------------+-----------+--------+-----------+------+
 7|sensor_id|cur_timestamp      |temperature|humidity|location   |status|
 8+---------+-------------------+-----------+--------+-----------+------+
 9|SENSOR_01|2024-01-15 10:00:00|22.5       |45.2    |warehouse_A|normal|
10|SENSOR_02|2024-01-15 10:05:00|35.8       |78.5    |warehouse_A|normal|
11|SENSOR_03|2024-01-15 10:10:00|18.2       |42.1    |warehouse_B|normal|
12|SENSOR_01|2024-01-15 10:15:00|23.1       |46.8    |warehouse_A|normal|
13|SENSOR_02|2024-01-15 10:20:00|38.5       |82.3    |warehouse_A|alert |
14|SENSOR_04|2024-01-15 10:25:00|15.2       |38.5    |warehouse_C|normal|
15|SENSOR_03|2024-01-15 10:30:00|19.1       |43.7    |warehouse_B|normal|
16|SENSOR_01|2024-01-15 10:35:00|24.2       |47.5    |warehouse_A|normal|
17|SENSOR_04|2024-01-15 10:40:00|12.8       |35.2    |warehouse_C|alert |
18|SENSOR_02|2024-01-15 10:45:00|32.1       |72.5    |warehouse_A|normal|
19+---------+-------------------+-----------+--------+-----------+------+
20
21
22########################################
23##### 1. Time-Based Variable Usage #####
24########################################
25
26EXECUTION n°1 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:15:00
27Result records: 3
28+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
29|sensor_id|location   |readings_in_window|avg_temp|temp_range        |window_start       |window_end         |
30+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
31|SENSOR_01|warehouse_A|2                 |22.8    |0.6000000000000014|2024-01-15 10:00:00|2024-01-15 10:15:00|
32|SENSOR_02|warehouse_A|1                 |35.8    |0.0               |2024-01-15 10:00:00|2024-01-15 10:15:00|
33|SENSOR_03|warehouse_B|1                 |18.2    |0.0               |2024-01-15 10:00:00|2024-01-15 10:15:00|
34+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
35
36EXECUTION n°2 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:30:00
37Result records: 4
38+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
39|sensor_id|location   |readings_in_window|avg_temp|temp_range        |window_start       |window_end         |
40+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
41|SENSOR_01|warehouse_A|2                 |22.8    |0.6000000000000014|2024-01-15 10:00:00|2024-01-15 10:30:00|
42|SENSOR_02|warehouse_A|2                 |37.15   |2.700000000000003 |2024-01-15 10:00:00|2024-01-15 10:30:00|
43|SENSOR_03|warehouse_B|2                 |18.65   |0.9000000000000021|2024-01-15 10:00:00|2024-01-15 10:30:00|
44|SENSOR_04|warehouse_C|1                 |15.2    |0.0               |2024-01-15 10:00:00|2024-01-15 10:30:00|
45+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
46
47
48################################################
49##### 2. Variables vs String Interpolation #####
50################################################
51
52UNSAFE: String interpolation (vulnerable to SQL injection)
53Result records: 10
54+---------+-------------------+-----------+--------+-----------+------+
55|sensor_id|cur_timestamp      |temperature|humidity|location   |status|
56+---------+-------------------+-----------+--------+-----------+------+
57|SENSOR_01|2024-01-15 10:00:00|22.5       |45.2    |warehouse_A|normal|
58|SENSOR_02|2024-01-15 10:05:00|35.8       |78.5    |warehouse_A|normal|
59|SENSOR_03|2024-01-15 10:10:00|18.2       |42.1    |warehouse_B|normal|
60|SENSOR_01|2024-01-15 10:15:00|23.1       |46.8    |warehouse_A|normal|
61|SENSOR_02|2024-01-15 10:20:00|38.5       |82.3    |warehouse_A|alert |
62|SENSOR_04|2024-01-15 10:25:00|15.2       |38.5    |warehouse_C|normal|
63|SENSOR_03|2024-01-15 10:30:00|19.1       |43.7    |warehouse_B|normal|
64|SENSOR_01|2024-01-15 10:35:00|24.2       |47.5    |warehouse_A|normal|
65|SENSOR_04|2024-01-15 10:40:00|12.8       |35.2    |warehouse_C|alert |
66|SENSOR_02|2024-01-15 10:45:00|32.1       |72.5    |warehouse_A|normal|
67+---------+-------------------+-----------+--------+-----------+------+
68
69SAFE: Variable substitution (injection-proof)
70Result records: 0
71+---------+-------------+-----------+--------+--------+------+
72|sensor_id|cur_timestamp|temperature|humidity|location|status|
73+---------+-------------+-----------+--------+--------+------+
74+---------+-------------+-----------+--------+--------+------+

Multi-Statement SQL Scripts

Introduction

Multi-statement SQL scripts enable executing multiple SQL commands in a single script file with procedural control flow. Supports variable declarations, conditional logic (IF/ELSE), loops (WHILE, FOR), exception handling (BEGIN/END blocks), and statement sequencing.

Detail

Before Spark 4.0, executing multiple SQL statements required either:

  1. Application-Level Orchestration : Each statement is a separate API call. No transactional semantics. Failure handling requires try-catch blocks in application code.
1spark.sql("CREATE TABLE staging AS SELECT * FROM source")
2spark.sql("DELETE FROM staging WHERE invalid = true")
3spark.sql("INSERT INTO target SELECT * FROM staging")
  1. External Workflow Tools : Adds operational complexity. Requires separate orchestration infrastructure. Simple SQL logic becomes Python/YAML configuration.
  2. Multiple Spark SQL Invocations : No state sharing between invocations. Cannot pass variables. Poor performance due to repeated session initialization.

Spark 4.0 introduces multi-statement scripts that execute as a single unit. Scripts contain multiple SQL statements separated by semicolons, with procedural control flow.

The script parser breaks input into individual statements and builds an execution plan. Each statement executes sequentially. Variables declared in early statements are available to later statements. Control flow statements (IF/ELSE, WHILE) alter execution order based on runtime conditions.

  • Scripts execute on the Spark driver.
  • Control flow is evaluated on the driver, not distributed to executors.
  • Each statement goes through normal Catalyst optimization and execution.
  • The execution model is sequential, not transactional : If statement 3 fails, statements 1 and 2 have already committed their changes. There is no automatic rollback. This differs from traditional database stored procedures which often provide transaction semantics.

Key language constructs :

Advantages

  1. Reduced External Dependencies : Simple ETL pipelines no longer require specific orchestration tools. SQL scripts can implement validation, transformation, and loading logic in a single file. Reduces operational complexity and infrastructure costs.
  2. Procedural Logic in SQL : IF/ELSE and WHILE constructs enable business logic implementation directly in SQL. Reduces context switching between SQL and Python/Scala. Teams with strong SQL skills can build complete pipelines without application code.
  3. Version Control and Portability : Scripts are plain text files that live in version control. Changes are tracked through Git. Scripts are portable across Spark deployments (local, cluster, Connect). Easier to review and audit than application code spread across multiple files.
  4. Server-Side Execution : With Spark Connect, entire script is sent to server and executes remotely. Reduces network round-trips compared to per-statement submission. Client only receives final results, not intermediate state.

Limitations

  1. No Transaction Rollback : Failed statements do not automatically undo previous statements. If script creates staging table, loads data, then fails on validation, the staging table remains. Manual cleanup required. This is fundamentally different from ACID databases.
  2. Limited Debugging Capabilities : No interactive debugger. No breakpoints. No variable inspection during execution. Debugging requires adding SELECT statements to print intermediate values. Error messages may not clearly indicate which statement failed in complex scripts.
  3. Complexity Threshold : Complex logic is better implemented in application code with proper testing frameworks. Scripts should remain linear data pipelines, not full applications.
  4. Error Handling Gaps : Cannot catch specific error types with granularity of try-catch in Python/Scala. Cannot continue execution after certain errors. Script failure model is all-or-nothing for many error types.

Real-World Use Cases

  • Use Case 1 : Incremental Loading
    • A loop that processes 24 hourly partitions one by one.
    • A Script that adds new partitions for the 30 next days, checking if already exists.
  • Use Case 2 : Environment-Specific Deployment
    • Deployment scripts must create different table structures in dev vs prod (dev has additional debug columns). Script uses IF/ELSE based on environment variable to execute appropriate CREATE TABLE statements. Single script replaces separate dev and prod SQL files.

Codes

Example : Execute multi-statement script with Spark Connect

Spark

Content of Spark Connect script :

 1from pyspark.sql import SparkSession
 2
 3REMOTE_URL = "sc://localhost:15002"
 4SCRIPT_SQL_NAME = "example_SQL.sql"
 5
 6# Connexion to Spark Connect
 7spark = SparkSession.builder \
 8    .remote(REMOTE_URL) \
 9    .appName("SPARK_SQLScript") \
10    .config("spark.sql.ansi.enabled", "true") \
11    .getOrCreate()
12
13print("Successfully connected to Spark!")
14print(f"Spark version: {spark.version}")
15
16# Script reading
17with open(SCRIPT_SQL_NAME, "r", encoding="utf-8") as f:
18    sql_script = f.read()
19
20# Script SQL execution with only one spark call
21result = spark.sql(sql_script)
22result.show()

Content of the example_SQL.sql SQL script file :

  1-- Spark 4.x - SQL Scripting
  2
  3BEGIN
  4    -- =============================================
  5    -- STEP n°1 : VARIABLES
  6    -- =============================================
  7    DECLARE batch_date DATE DEFAULT CURRENT_DATE();
  8
  9    -- Business thresholds
 10    DECLARE min_valid_amount DECIMAL(10,2) DEFAULT 10.00;
 11    DECLARE standard_threshold DECIMAL(10,2) DEFAULT 100.00;
 12    DECLARE premium_threshold DECIMAL(10,2) DEFAULT 1000.00;
 13
 14    -- Metrics
 15    DECLARE total_processed INT DEFAULT 0;
 16    DECLARE total_invalid INT DEFAULT 0;
 17    DECLARE total_revenue DECIMAL(15,2) DEFAULT 0.0;
 18
 19    -- =============================================
 20    -- STEP n°2 : TABLES INITIALIZATION
 21    -- =============================================
 22
 23    -- Staging table for raw data
 24    DROP TABLE IF EXISTS raw_orders;
 25    CREATE TABLE raw_orders (
 26        order_id BIGINT,
 27        customer_id BIGINT,
 28        order_date DATE,
 29        amount DECIMAL(10,2),
 30        status STRING,
 31        region STRING
 32    ) USING parquet;
 33
 34    -- Table of validated orders
 35    DROP TABLE IF EXISTS validated_orders;
 36    CREATE TABLE validated_orders (
 37        order_id BIGINT,
 38        customer_id BIGINT,
 39        order_date DATE,
 40        amount DECIMAL(10,2),
 41        region STRING,
 42        order_category STRING,
 43        loyalty_points DECIMAL(10,2),
 44        processing_date DATE
 45    ) USING parquet;
 46
 47    -- Quarantine table for invalid data
 48    DROP TABLE IF EXISTS invalid_orders;
 49    CREATE TABLE invalid_orders (
 50        order_id BIGINT,
 51        customer_id BIGINT,
 52        amount DECIMAL(10,2),
 53        error_reason STRING,
 54        quarantine_timestamp TIMESTAMP
 55    ) USING parquet;
 56
 57    -- =============================================
 58    -- STEP n°3 : DATASETS
 59    -- =============================================
 60
 61    INSERT INTO raw_orders VALUES
 62        (1001, 501, DATE'2024-01-15', 150.50, 'completed', 'EMEA'),
 63        (1002, 502, DATE'2024-01-15', 1250.00, 'completed', 'AMER'),
 64        (1003, 503, DATE'2024-01-15', -50.00, 'completed', 'APAC'), -- Invalid: negative
 65        (1004, 504, DATE'2024-01-15', 890.00, 'completed', 'EMEA'),
 66        (2001, 505, DATE'2024-01-16', 2100.00, 'completed', 'AMER'),
 67        (2002, 506, DATE'2024-01-16', NULL, 'completed', 'APAC'), -- Invalid: NULL amount
 68        (2003, 507, DATE'2024-01-16', 450.00, 'completed', 'EMEA'),
 69        (2004, 508, DATE'2024-01-16', 5.00, 'completed', 'AMER'), -- Invalid: below minimum
 70        (3001, 509, DATE'2024-01-17', 750.00, 'completed', 'APAC'),
 71        (3002, 510, DATE'2024-01-17', 1500.00, 'completed', 'EMEA'),
 72        (3003, 511, DATE'2024-01-17', 220.00, 'completed', 'AMER');
 73
 74    -- =============================================
 75    -- STEP n°4 : DATA VALIDATION
 76    -- =============================================
 77
 78    -- Quarantining invalid records
 79    INSERT INTO invalid_orders
 80    SELECT 
 81        order_id,
 82        customer_id,
 83        amount,
 84        CASE 
 85            WHEN amount IS NULL THEN 'NULL_AMOUNT'
 86            WHEN amount < 0 THEN 'NEGATIVE_AMOUNT'
 87            WHEN amount < min_valid_amount THEN 'BELOW_MINIMUM'
 88            ELSE 'UNKNOWN_ERROR'
 89        END AS error_reason,
 90        CURRENT_TIMESTAMP() AS quarantine_timestamp
 91    FROM raw_orders
 92    WHERE status = 'completed'
 93        AND (amount IS NULL 
 94            OR amount < 0 
 95            OR amount < min_valid_amount);
 96
 97    -- Counting invalid rows
 98    SET total_invalid = (SELECT COUNT(*) FROM invalid_orders);
 99            
100    -- =============================================
101    -- STEP n°5 : VALID DATA TRANSFORMATION
102    -- =============================================
103
104    INSERT INTO validated_orders
105    SELECT 
106        order_id,
107        customer_id,
108        order_date,
109        amount,
110        region,
111        -- CASE WHEN classification
112        CASE 
113            WHEN amount >= premium_threshold THEN 'PREMIUM'
114            WHEN amount >= standard_threshold THEN 'STANDARD'
115            ELSE 'BASIC'
116        END AS order_category,
117        -- Calculation of loyalty points
118        CASE 
119            WHEN amount >= premium_threshold THEN amount * 0.10
120            WHEN amount >= standard_threshold THEN amount * 0.05
121            ELSE amount * 0.02
122        END AS loyalty_points,
123        batch_date AS processing_date
124    FROM raw_orders
125    WHERE status = 'completed'
126        AND amount IS NOT NULL
127        AND amount >= min_valid_amount;
128        
129    -- Counting total rows processed
130    SET total_processed = (SELECT COUNT(*) FROM validated_orders WHERE processing_date = batch_date);
131            
132    -- Revenue calculation
133    SET total_revenue = (SELECT COALESCE(SUM(amount), 0) FROM validated_orders WHERE processing_date = batch_date);
134
135
136    -- =============================================
137    -- STEP n°6 : GENERATION OF FINAL REPORTS
138    -- =============================================
139    SELECT '========== EXECUTION - SUMMARY ==========' AS message;
140
141    -- Execution report
142    SELECT 'Batch Date' as `Metric`, CAST(batch_date AS STRING) as `Value`
143    UNION ALL SELECT 'Total Valid Orders', CAST(total_processed AS STRING)
144    UNION ALL SELECT 'Total Invalid Orders', CAST(total_invalid AS STRING)
145    UNION ALL SELECT 'Total Revenue', CAST(total_revenue AS STRING)
146    UNION ALL SELECT 'Invalid Ratio %', CAST(ROUND((total_invalid * 100.0) / GREATEST(total_processed + total_invalid, 1), 2) AS STRING);
147
148    -- =============================================
149    -- STEP n°7 : DROP TABLES
150    -- =============================================
151    DROP TABLE IF EXISTS raw_orders;
152    DROP TABLE IF EXISTS validated_orders;
153    DROP TABLE IF EXISTS invalid_orders;
154
155END;

Result

 1Successfully connected to Spark!
 2Spark version: 4.1.1
 3+--------------------+----------+
 4|              Metric|     Value|
 5+--------------------+----------+
 6|          Batch Date|2026-03-10|
 7|  Total Valid Orders|         8|
 8|Total Invalid Orders|         3|
 9|       Total Revenue|   7310.50|
10|     Invalid Ratio %|     27.27|
11+--------------------+----------+