Spark : v4.x - Features - SQL Scripts, Session and ANSI compliance
You’ll find in this article, some informations about the SQL features (Script, Session, Compliance) from Spark v4.x.
ANSI SQL Compliance
Introduction
ANSI SQL compliance mode enforces stricter SQL semantics matching ISO/IEC 9075 standard. Enables type safety, prevents implicit casting errors, enforces reserved keyword restrictions.
Detail
Spark’s default SQL behavior prioritizes ease of use over strict correctness. Implicit type conversions silently succeed even when semantically wrong. Dividing integers returns integers (truncation). Overflow operations wrap around without errors. Reserved keywords can be used as column names without quoting.
ANSI SQL compliance fundamentally changes these behaviors to match traditional databases like PostgreSQL, SQL Server and more. The changes affect three primary areas:
- Type System Strictness : Queries fail instead of returning truncated or
nullresults silently - NULL Handling : ANSI mode enforces stricter
NULLsemantics in predicates and set operations.NULL = NULLalways returns NULL (not true).UNION,INTERSECT, andEXCEPTtreatNULLvalues as equal for deduplication (matching the SQL standard). - Reserved Keywords : ANSI mode enforces SQL reserved keyword restrictions. Column names like
SELECT,FROM,WHEREmust be backtick-quoted. This matches traditional databases behavior but breaks Spark SQL queries that use these names freely.
The implementation affects multiple Spark subsystems:
- Catalyst Optimizer: Adds type validation rules during the analysis phase. Rejects queries with type mismatches earlier in planning.
- Code Generation: Changes generated code for arithmetic operations to include overflow checks. Adds branch instructions that check for overflow conditions before returning results.
- Runtime Execution: Wraps operations in exception handlers that fail tasks on ANSI violations. This changes error handling from silent NULL propagation to explicit failures.
The mode is configured per-session via spark.sql.ansi.enabled. Once enabled, all queries in that session follow ANSI rules.
Advices
spark.sql.ansi.enabled = true:- Master switch for ANSI compliance. Enables all strict behaviors: overflow checking, type safety, division semantics.
- Must be set before creating the session for consistent behavior.
- Cannot be changed mid-session in some Spark deployment modes.
TRY_CASTvsCASTin ANSI Mode :CAST()throws exceptions on conversion errors in ANSI mode.TRY_CAST()returnsNULLbut explicitly signals validation failure.- Use
TRY_CAST()for data validation pipelines where bad rows should be filtered/logged rather than failing the entire job.
typeof()Function :- Returns the data type of an expression result. Shows whether result is
INT,DECIMAL,DOUBLE, etc. Use in migration validation to identify type changes.
- Returns the data type of an expression result. Shows whether result is
- Reserved Keyword Quoting
- ANSI mode enforces backtick quoting for reserved words. Queries with column names like
user,timestamp,ordermust be rewritten. This is a breaking change for large SQL codebases. - Run static analysis to identify affected queries before migration.
- ANSI mode enforces backtick quoting for reserved words. Queries with column names like
Advantages
- Improved Type Safety : Strict type checking catches errors at query compilation instead of runtime. Schema evolution issues appear earlier. Prevents data quality issues from propagating through data pipelines.
- Fail-Fast Error Detection : Type mismatches and arithmetic errors fail immediately instead of producing silent incorrect results. A pipeline that processes financial transactions won’t silently corrupt data on overflow. Errors surface during testing rather than in production.
- Database Migration Compatibility : Queries that worked correctly in the source database behave identically in Spark. (Reduces migration risk and testing burden)
- Regulatory Compliance : Financial services and healthcare industries require deterministic, auditable query behavior. ANSI mode provides predictable semantics that match regulatory expectations.
Limitations
- Breaking Changes to Existing Queries : Queries that relied on Spark’s lenient behavior will fail.
CASToperations that previously returned NULL now throw exceptions. Integer division results change from 0 to 0.5. Large codebases require extensive testing and refactoring. - Performance Overhead : Each addition, multiplication, and division includes conditional branches to check for overflow. Processing involving extensive calculations on large volumes of data may experience a decline in performance.
- All-or-Nothing Configuration : Cannot enable ANSI mode for specific queries or tables. The session-wide setting affects all operations. Gradual migration requires running two separate Spark sessions with different configurations.
- Error Messages Can Fail Entire Jobs : A single row with a type conversion error fails the entire task. In default mode, bad rows become NULL and processing continues. ANSI mode lacks row-level error recovery. Requires pre-validation or a separate error handling pipeline.
Real-World Use Cases
- Use Case 1: PostgreSQL to Spark Migration
- A company migrates 500+ SQL queries from PostgreSQL data warehouse to Spark lakehouse. Queries contain integer division, overflow-prone calculations, and strict type dependencies. ANSI mode enables running queries unmodified. Without ANSI mode, a significant percentage of queries may need to be rewritten to match PostgreSQL semantics.
- Use Case 2: Financial Calculation Accuracy
- A financial services firm calculates interest accruals on loan portfolios. Default Spark integer division
principal * rate / 365would truncate to zero for small daily rates. ANSI mode enforces decimal division producing accurate results. Prevents multi-million dollar calculation errors.
- A financial services firm calculates interest accruals on loan portfolios. Default Spark integer division
- Use Case 3: Data Quality Validation Pipeline
- An ETL pipeline validates incoming data against strict type schemas. Records with invalid type conversions must be rejected and not silently converted to NULL. ANSI mode fails tasks on first bad record, triggering alerts. Teams investigate data quality issues immediately instead of discovering corrupted data downstream.
Codes
Example : Type Safety and Arithmetic Behavior
This example demonstrates how ANSI mode changes type conversion and arithmetic operations. Shows breaking changes from default behavior.
Spark : Default Mode
1# PySpark Example - Not ANSI SQL Type Safety and Arithmetic
2from pyspark.sql import SparkSession
3from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType
4
5URL_MASTER = "spark://spark-master:7077"
6
7
8spark = SparkSession.builder \
9 .appName("SPARK_DefaultSQLMode") \
10 .config("spark.sql.ansi.enabled", "false") \
11 .master(URL_MASTER) \
12 .getOrCreate()
13
14
15print("""
16##########################
17##### Build Datasets #####
18##########################
19""")
20
21# Source system 1: Financial dataset
22# Demonstrates real-world scenarios where ANSI mode prevents errors
23financial_data = [
24 ("LOAN001", 10000, 5.25, 365, "2024-01-15"), # Standard loan
25 ("LOAN002", 50000, 4.75, 365, "2024-01-15"), # Large principal
26 ("LOAN003", 1500, 12.50, 365, "2024-01-15"), # High interest rate
27 ("LOAN004", 2147483647, 3.00, 365, "2024-01-15"), # Near integer max (overflow risk)
28 ("LOAN005", 25000, 6.00, 365, "2024-01-15"), # Medium loan
29 ("LOAN006", 8000, 0.00, 365, "2024-01-15"), # Zero interest (division edge case)
30 ("LOAN007", 100000, 7.25, 365, "2024-01-15"), # Large loan
31 ("LOAN008", 500, 15.00, 365, "2024-01-15"), # Small principal, high rate
32 ("LOAN009", 75000, 4.25, 365, "2024-01-15"), # Standard
33 ("LOAN010", 1000000, 3.50, 365, "2024-01-15"), # Very large principal (overflow in calculations)
34]
35
36# Source system 2: Problematic Financial dataset
37# Test dataset with intentionally problematic values for type conversion
38problematic_data = [
39 ("CONV001", "12345", "2024-01-15"), # Valid integer string
40 ("CONV002", "67890", "2024-01-15"), # Valid integer string
41 ("CONV003", "abc123", "2024-01-15"), # Invalid integer (contains letters)
42 ("CONV004", "99999", "2024-01-15"), # Valid integer
43 ("CONV005", "12.34", "2024-01-15"), # Decimal in integer field
44 ("CONV006", "", "2024-01-15"), # Empty string
45 ("CONV007", "55555", "2024-01-15"), # Valid integer
46 ("CONV008", "1e5", "2024-01-15"), # Scientific notation
47 ("CONV009", "42", "2024-01-15"), # Valid integer
48 ("CONV010", "NULL", "2024-01-15"), # String "NULL" not actual NULL
49]
50
51financial_schema = StructType([
52 StructField("loan_id", StringType()),
53 StructField("principal", IntegerType()),
54 StructField("annual_rate", DoubleType()),
55 StructField("days", IntegerType()),
56 StructField("date", StringType()),
57])
58
59problematic_schema = StructType([
60 StructField("record_id", StringType()),
61 StructField("value_str", StringType()),
62 StructField("date", StringType()),
63])
64
65
66# Create DataFrame in both sessions
67loans_data = spark.createDataFrame(financial_data,financial_schema)
68print(f"financial_data dataset: {loans_data.count()}")
69loans_data.show(truncate=False)
70
71
72problems_data = spark.createDataFrame(problematic_data,problematic_schema)
73print(f"problematic_data dataset: {problems_data.count()}")
74problems_data.show(truncate=False)
75
76
77# Register DataFrames as temporary views for SQL access
78loans_data.createOrReplaceTempView("loans_data")
79problems_data.createOrReplaceTempView("problems_data")
80
81
82print("""
83########################################
84##### 1. Integer Overflow Behavior #####
85########################################
86""")
87
88
89overflow = spark.sql("""
90 SELECT
91 loan_id,
92 principal,
93 principal + 1000000 as principal_plus_1m,
94 CASE
95 WHEN principal + 1000000 < 0 THEN 'OVERFLOW DETECTED'
96 ELSE 'OK'
97 END as overflow_check
98 FROM loans_data
99 WHERE loan_id IN ('LOAN004', 'LOAN010')
100""")
101
102# LOAN004 (at INT max) wrapped to negative value
103overflow.show(truncate=False)
104
105
106
107print("""
108####################################
109##### 2. Type Casting Behavior #####
110####################################
111""")
112
113
114cast = spark.sql("""
115 SELECT
116 record_id,
117 value_str,
118 CAST(value_str AS INT) as value_int,
119 CASE
120 WHEN CAST(value_str AS INT) IS NULL THEN 'CAST FAILED'
121 ELSE 'CAST OK'
122 END as cast_status
123 FROM problems_data
124 ORDER BY record_id
125""")
126
127# Invalid values became NULL without any error indication
128cast.show(truncate=False)
129
130
131spark.stop()
Result : Default Mode
1##########################
2##### Build Datasets #####
3##########################
4
5financial_data dataset: 10
6+-------+----------+-----------+----+----------+
7|loan_id|principal |annual_rate|days|date |
8+-------+----------+-----------+----+----------+
9|LOAN001|10000 |5.25 |365 |2024-01-15|
10|LOAN002|50000 |4.75 |365 |2024-01-15|
11|LOAN003|1500 |12.5 |365 |2024-01-15|
12|LOAN004|2147483647|3.0 |365 |2024-01-15|
13|LOAN005|25000 |6.0 |365 |2024-01-15|
14|LOAN006|8000 |0.0 |365 |2024-01-15|
15|LOAN007|100000 |7.25 |365 |2024-01-15|
16|LOAN008|500 |15.0 |365 |2024-01-15|
17|LOAN009|75000 |4.25 |365 |2024-01-15|
18|LOAN010|1000000 |3.5 |365 |2024-01-15|
19+-------+----------+-----------+----+----------+
20
21problematic_data dataset: 10
22+---------+---------+----------+
23|record_id|value_str|date |
24+---------+---------+----------+
25|CONV001 |12345 |2024-01-15|
26|CONV002 |67890 |2024-01-15|
27|CONV003 |abc123 |2024-01-15|
28|CONV004 |99999 |2024-01-15|
29|CONV005 |12.34 |2024-01-15|
30|CONV006 | |2024-01-15|
31|CONV007 |55555 |2024-01-15|
32|CONV008 |1e5 |2024-01-15|
33|CONV009 |42 |2024-01-15|
34|CONV010 |NULL |2024-01-15|
35+---------+---------+----------+
36
37
38##############################################################
39##### 1. Calculate daily interest using integer division #####
40##############################################################
41
42+-------+----------+-----------+------------------+-----------+
43|loan_id|principal |annual_rate|daily_interest |result_type|
44+-------+----------+-----------+------------------+-----------+
45|LOAN001|10000 |5.25 |1.4383561643835616|double |
46|LOAN002|50000 |4.75 |6.506849315068493 |double |
47|LOAN003|1500 |12.5 |0.5136986301369864|double |
48|LOAN004|2147483647|3.0 |176505.5052328767 |double |
49|LOAN005|25000 |6.0 |4.109589041095891 |double |
50|LOAN006|8000 |0.0 |0.0 |double |
51|LOAN007|100000 |7.25 |19.863013698630137|double |
52|LOAN008|500 |15.0 |0.2054794520547945|double |
53|LOAN009|75000 |4.25 |8.732876712328768 |double |
54|LOAN010|1000000 |3.5 |95.89041095890411 |double |
55+-------+----------+-----------+------------------+-----------+
56
57
58########################################
59##### 2. Integer Overflow Behavior #####
60########################################
61
62+-------+----------+-----------------+-----------------+
63|loan_id|principal |principal_plus_1m|overflow_check |
64+-------+----------+-----------------+-----------------+
65|LOAN004|2147483647|-2146483649 |OVERFLOW DETECTED|
66|LOAN010|1000000 |2000000 |OK |
67+-------+----------+-----------------+-----------------+
68
69
70####################################
71##### 3. Type Casting Behavior #####
72####################################
73
74+---------+---------+---------+-----------+
75|record_id|value_str|value_int|cast_status|
76+---------+---------+---------+-----------+
77|CONV001 |12345 |12345 |CAST OK |
78|CONV002 |67890 |67890 |CAST OK |
79|CONV003 |abc123 |NULL |CAST FAILED|
80|CONV004 |99999 |99999 |CAST OK |
81|CONV005 |12.34 |12 |CAST OK |
82|CONV006 | |NULL |CAST FAILED|
83|CONV007 |55555 |55555 |CAST OK |
84|CONV008 |1e5 |NULL |CAST FAILED|
85|CONV009 |42 |42 |CAST OK |
86|CONV010 |NULL |NULL |CAST FAILED|
87+---------+---------+---------+-----------+
Spark : ANSI Mode
1# PySpark Example - ANSI SQL Type Safety and Arithmetic
2from pyspark.sql import SparkSession
3from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType
4
5URL_MASTER = "spark://spark-master:7077"
6
7
8spark = SparkSession.builder \
9 .appName("SPARK_ANSISQLMode") \
10 .config("spark.sql.ansi.enabled", "true") \
11 .master(URL_MASTER) \
12 .getOrCreate()
13
14
15print("""
16##########################
17##### Build Datasets #####
18##########################
19""")
20
21# Source system 1: Financial dataset
22# Demonstrates real-world scenarios where ANSI mode prevents errors
23financial_data = [
24 ("LOAN001", 10000, 5.25, 365, "2024-01-15"), # Standard loan
25 ("LOAN002", 50000, 4.75, 365, "2024-01-15"), # Large principal
26 ("LOAN003", 1500, 12.50, 365, "2024-01-15"), # High interest rate
27 ("LOAN004", 2147483647, 3.00, 365, "2024-01-15"), # Near integer max (overflow risk)
28 ("LOAN005", 25000, 6.00, 365, "2024-01-15"), # Medium loan
29 ("LOAN006", 8000, 0.00, 365, "2024-01-15"), # Zero interest (division edge case)
30 ("LOAN007", 100000, 7.25, 365, "2024-01-15"), # Large loan
31 ("LOAN008", 500, 15.00, 365, "2024-01-15"), # Small principal, high rate
32 ("LOAN009", 75000, 4.25, 365, "2024-01-15"), # Standard
33 ("LOAN010", 1000000, 3.50, 365, "2024-01-15"), # Very large principal (overflow in calculations)
34]
35
36# Source system 2: Problematic Financial dataset
37# Test dataset with intentionally problematic values for type conversion
38problematic_data = [
39 ("CONV001", "12345", "2024-01-15"), # Valid integer string
40 ("CONV002", "67890", "2024-01-15"), # Valid integer string
41 ("CONV003", "abc123", "2024-01-15"), # Invalid integer (contains letters)
42 ("CONV004", "99999", "2024-01-15"), # Valid integer
43 ("CONV005", "12.34", "2024-01-15"), # Decimal in integer field
44 ("CONV006", "", "2024-01-15"), # Empty string
45 ("CONV007", "55555", "2024-01-15"), # Valid integer
46 ("CONV008", "1e5", "2024-01-15"), # Scientific notation
47 ("CONV009", "42", "2024-01-15"), # Valid integer
48 ("CONV010", "NULL", "2024-01-15"), # String "NULL" not actual NULL
49]
50
51financial_schema = StructType([
52 StructField("loan_id", StringType()),
53 StructField("principal", IntegerType()),
54 StructField("annual_rate", DoubleType()),
55 StructField("days", IntegerType()),
56 StructField("date", StringType()),
57])
58
59problematic_schema = StructType([
60 StructField("record_id", StringType()),
61 StructField("value_str", StringType()),
62 StructField("date", StringType()),
63])
64
65
66# Create DataFrame in both sessions
67loans_data = spark.createDataFrame(financial_data,financial_schema)
68print(f"financial_data dataset: {loans_data.count()}")
69loans_data.show(truncate=False)
70
71
72problems_data = spark.createDataFrame(problematic_data,problematic_schema)
73print(f"problematic_data dataset: {problems_data.count()}")
74problems_data.show(truncate=False)
75
76
77# Register DataFrames as temporary views for SQL access
78loans_data.createOrReplaceTempView("loans_data")
79problems_data.createOrReplaceTempView("problems_data")
80
81
82print("""
83########################################
84##### 1. Integer Overflow Behavior #####
85########################################
86""")
87
88
89try:
90 overflow = spark.sql("""
91 SELECT
92 loan_id,
93 principal,
94 principal + 1000000 as principal_plus_1m
95 FROM loans_data
96 WHERE loan_id IN ('LOAN004', 'LOAN010')
97 """)
98 overflow.show(truncate=False)
99 print("Query completed (no overflow detected)")
100except Exception as e:
101 print(f"Exception Caught: {type(e).__name__}")
102 print(f"Message: {str(e)[:200]}")
103 print("\nThis is EXPECTED in ANSI mode - overflow is an error, not silent corruption")
104
105
106
107print("""
108####################################
109##### 2. Type Casting Behavior #####
110####################################
111""")
112
113try:
114 cast = spark.sql("""
115 SELECT
116 record_id,
117 value_str,
118 CAST(value_str AS INT) as value_int
119 FROM problems_data
120 ORDER BY record_id
121 """)
122 cast.show(truncate=False)
123except Exception as e:
124 print(f"Exception Caught: {type(e).__name__}")
125 print(f"Failed on record: {str(e)[str(e).find('CONV'):str(e).find('CONV')+10] if 'CONV' in str(e) else 'unknown'}")
126 print("\nThis is EXPECTED in ANSI mode - forces explicit error handling")
127
128
129print("\n##### with TRY_CAST: Safe Error Handling #####")
130try_cast = spark.sql("""
131 SELECT
132 record_id,
133 value_str,
134 TRY_CAST(value_str AS INT) as value_int,
135 CASE
136 WHEN TRY_CAST(value_str AS INT) IS NULL THEN 'CONVERSION_ERROR'
137 ELSE 'OK'
138 END as validation_status
139 FROM problems_data
140 ORDER BY record_id
141""")
142# TRY_CAST provides NULL on error but explicitly marks rows as problematic
143try_cast.show(truncate=False)
144
145
146spark.stop()
Result : ANSI Mode
1##########################
2##### Build Datasets #####
3##########################
4
5financial_data dataset: 10
6+-------+----------+-----------+----+----------+
7|loan_id|principal |annual_rate|days|date |
8+-------+----------+-----------+----+----------+
9|LOAN001|10000 |5.25 |365 |2024-01-15|
10|LOAN002|50000 |4.75 |365 |2024-01-15|
11|LOAN003|1500 |12.5 |365 |2024-01-15|
12|LOAN004|2147483647|3.0 |365 |2024-01-15|
13|LOAN005|25000 |6.0 |365 |2024-01-15|
14|LOAN006|8000 |0.0 |365 |2024-01-15|
15|LOAN007|100000 |7.25 |365 |2024-01-15|
16|LOAN008|500 |15.0 |365 |2024-01-15|
17|LOAN009|75000 |4.25 |365 |2024-01-15|
18|LOAN010|1000000 |3.5 |365 |2024-01-15|
19+-------+----------+-----------+----+----------+
20
21problematic_data dataset: 10
22+---------+---------+----------+
23|record_id|value_str|date |
24+---------+---------+----------+
25|CONV001 |12345 |2024-01-15|
26|CONV002 |67890 |2024-01-15|
27|CONV003 |abc123 |2024-01-15|
28|CONV004 |99999 |2024-01-15|
29|CONV005 |12.34 |2024-01-15|
30|CONV006 | |2024-01-15|
31|CONV007 |55555 |2024-01-15|
32|CONV008 |1e5 |2024-01-15|
33|CONV009 |42 |2024-01-15|
34|CONV010 |NULL |2024-01-15|
35+---------+---------+----------+
36
37
38########################################
39##### 1. Integer Overflow Behavior #####
40########################################
41
42== SQL (line 5, position 13) ==
43 principal + 1000000 as principal_plus_1m
44 ^^^^^^^^^^^^^^^^^^^
45
46 at org.apache.spark.sql.errors.ExecutionErrors.arithmeticOverflowError(ExecutionErrors.scala:132)
47 ...
48 at java.base/java.lang.Thread.run(Thread.java:1583)
49
50
51Exception Caught: ArithmeticException
52Message: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_add' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22003
53== SQL (l
54
55This is EXPECTED in ANSI mode - overflow is an error, not silent corruption
56
57####################################
58##### 2. Type Casting Behavior #####
59####################################
60
61== SQL (line 5, position 13) ==
62 CAST(value_str AS INT) as value_int
63 ^^^^^^^^^^^^^^^^^^^^^^
64
65 at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:147)
66 ...
67 at java.base/java.lang.Thread.run(Thread.java:1583)
68
69Exception Caught: NumberFormatException
70Failed on record: unknown
71
72This is EXPECTED in ANSI mode - forces explicit error handling
73
74##### with TRY_CAST: Safe Error Handling #####
75
76+---------+---------+---------+-----------------+
77|record_id|value_str|value_int|validation_status|
78+---------+---------+---------+-----------------+
79|CONV001 |12345 |12345 |OK |
80|CONV002 |67890 |67890 |OK |
81|CONV003 |abc123 |NULL |CONVERSION_ERROR |
82|CONV004 |99999 |99999 |OK |
83|CONV005 |12.34 |NULL |CONVERSION_ERROR |
84|CONV006 | |NULL |CONVERSION_ERROR |
85|CONV007 |55555 |55555 |OK |
86|CONV008 |1e5 |NULL |CONVERSION_ERROR |
87|CONV009 |42 |42 |OK |
88|CONV010 |NULL |NULL |CONVERSION_ERROR |
89+---------+---------+---------+-----------------+
Session SQL Variables
Introduction
Session SQL variables enable dynamic parameterization of queries without string concatenation or external configuration files. (Variables are session-scoped and type-safe. )
Detail
Before Spark 4.0, parameterizing SQL queries required three problematic approaches:
- String Interpolation in Application Code
- Risk: SQL injection if
countrycomes from user input. Requires recompiling query plan for each parameter value. - Example :
- Risk: SQL injection if
1country = "US"
2df = spark.sql(f"SELECT * FROM orders WHERE country = '{country}'")
- Spark Configuration Properties
- Verbose syntax. Limited to string types. Configuration namespace pollution.
- Example :
1spark.conf.set("my.param.pays", "US")
2spark.sql("SELECT * FROM orders WHERE country = '${spark.my.param.pays}'")
- External Configuration Files
- Adds dependency on external systems. Configuration drift between environments. No type safety.
- Example :
1params = yaml.load("config.yaml")
2spark.sql(f"SELECT * FROM orders WHERE country = '{params['country']}'")
Spark 4.0 introduces SQL variables that solve these problems. Variables are declared with explicit types and scoped to the session:
1DECLARE min_amount DECIMAL(10,2) DEFAULT 100.0;
2SELECT * FROM orders WHERE amount >= min_amount;
The two supported syntaxes :
DECLAREStatement (SQL Standard)- Explicit type declaration. Required in ANSI SQL mode.
- Example :
1DECLARE variable_name TYPE [DEFAULT value];
SETStatement (Backwards Compatible)- Type inferred from value. Compatible with Spark 3.x session property syntax. Variables are type-safe once set.
- Example :
1SET variable_name = value;
The reference is resolved during query compilation, not runtime. This enables Catalyst optimizer to use variable values for predicate pushdown and partition pruning.
The variable system maintains a session-level symbol table. Each variable has:
- Name: Identifier used in queries
- Type: Scalar Data type (Int, String, Decimal, Boolean, Date, Timestamp)
- Value: Current value (mutable with subsequent SET)
- Scope: Session-only (not visible to other sessions)
Variables integrate with query planning. When Catalyst analyzes a query with ${variable_name}, it:
- Looks up variable in session symbol table
- Replaces reference with literal value
- Proceeds with optimization using concrete value
Runtime performance is identical to hardcoded literals.
Advantages
- Type-Safe Parameterization : Variables have explicit types checked at declaration. Assigning wrong type produces compilation error. Prevents runtime type errors from misconfigured parameters.
- SQL Injection Prevention : Variable references are resolved during query compilation, not runtime string substitution. Malicious input in variable values cannot alter query structure.
- Simplified Multi-Environment Configuration : Same SQL script runs in dev/rec/prod with different variable values. No code changes between environments. Variables can be set from command-line arguments or environment variables at session startup.
- Optimizer-Aware Parameterization : Catalyst optimizer sees variable values during planning. Enables partition pruning and predicate pushdown based on variable values.
Limitations
- Session-Scoped Only : Variables don’t persist after session termination. No global variables shared across concurrent sessions. Each client must redeclare variables. Unsuitable for cluster-wide configuration that needs consistency across jobs.
- No Complex Types : Variables support only scalar types. Cannot declare
ARRAY,STRUCT, orMAPvariables. Passing complex parameter values still requires configuration properties or external storage. - No Variable Persistence APIs : Variables cannot be saved to metastore or external storage. No built-in mechanism to load variables from YAML/JSON files. Teams must build custom variable management on top of base SQL variables.
Real-World Use Cases
- Use Case 1 : Parameterized Reports
- Define a date range at the beginning of the script and use it in multiple queries without repetition.
- Use Case 2 : Environment-Specific Data Processing
- An ETL pipeline runs in dev, staging, and production environments. Each environment reads from different database prefixes (
dev_db.orders,prod_db.orders). Variablessource_databaseandtarget_databaseare set from environment variables at job startup. Same SQL script works across all environments without modification.
- An ETL pipeline runs in dev, staging, and production environments. Each environment reads from different database prefixes (
Codes
Example : How SQL variables session works
Spark
1# PySpark Example : Demonstrates how variables works
2from pyspark.sql import SparkSession
3from pyspark.sql.types import DoubleType, StringType, StructField, StructType
4from pyspark.sql.functions import to_timestamp
5
6URL_MASTER = "spark://spark-master:7077"
7
8
9spark = SparkSession.builder \
10 .appName("SPARK_APPVARIABLE") \
11 .config("spark.sql.ansi.enabled", "true") \
12 .master(URL_MASTER) \
13 .getOrCreate()
14
15
16print("""
17##########################
18##### Build Datasets #####
19##########################
20""")
21
22# Source system 1: Sensor dataset
23# Demonstrates real-world scenarios where ANSI mode prevents errors
24sensor_readings = [
25 ("SENSOR_01", "2024-01-15 10:00:00", 22.5, 45.2, "warehouse_A", "normal"),
26 ("SENSOR_02", "2024-01-15 10:05:00", 35.8, 78.5, "warehouse_A", "normal"), # High temp
27 ("SENSOR_03", "2024-01-15 10:10:00", 18.2, 42.1, "warehouse_B", "normal"),
28 ("SENSOR_01", "2024-01-15 10:15:00", 23.1, 46.8, "warehouse_A", "normal"),
29 ("SENSOR_02", "2024-01-15 10:20:00", 38.5, 82.3, "warehouse_A", "alert"), # Anomaly
30 ("SENSOR_04", "2024-01-15 10:25:00", 15.2, 38.5, "warehouse_C", "normal"), # Low temp
31 ("SENSOR_03", "2024-01-15 10:30:00", 19.1, 43.7, "warehouse_B", "normal"),
32 ("SENSOR_01", "2024-01-15 10:35:00", 24.2, 47.5, "warehouse_A", "normal"),
33 ("SENSOR_04", "2024-01-15 10:40:00", 12.8, 35.2, "warehouse_C", "alert"), # Anomaly
34 ("SENSOR_02", "2024-01-15 10:45:00", 32.1, 72.5, "warehouse_A", "normal"),
35]
36
37
38sensor_schema = StructType([
39 StructField("sensor_id", StringType()),
40 StructField("cur_timestamp", StringType()),
41 StructField("temperature", DoubleType()),
42 StructField("humidity", DoubleType()),
43 StructField("location", StringType()),
44 StructField("status", StringType()),
45])
46
47
48# Create DataFrame
49sensors_df = spark.createDataFrame(sensor_readings,sensor_schema).withColumn("cur_timestamp",to_timestamp("cur_timestamp", format="yyyy-MM-dd HH:mm:ss"))
50print(f"sensor dataset: {sensors_df.count()}")
51sensors_df.show(truncate=False)
52
53# Register DataFrames as temporary views for SQL access
54sensors_df.createOrReplaceTempView("sensors_data")
55
56
57
58print("""
59########################################
60##### 1. Time-Based Variable Usage #####
61########################################
62""")
63
64spark.sql("DECLARE VARIABLE analysis_start TIMESTAMP DEFAULT TIMESTAMP'2024-01-15 10:00:00'")
65spark.sql("DECLARE VARIABLE analysis_end TIMESTAMP DEFAULT TIMESTAMP'2024-01-15 10:15:00'")
66
67query_time_based = """
68 SELECT
69 sensor_id,
70 location,
71 COUNT(*) as readings_in_window,
72 AVG(temperature) as avg_temp,
73 MAX(temperature) - MIN(temperature) as temp_range,
74 analysis_start as window_start,
75 analysis_end as window_end
76 FROM sensors_data
77 WHERE cur_timestamp BETWEEN analysis_start AND analysis_end
78 GROUP BY sensor_id, location
79 ORDER BY sensor_id, location;
80"""
81
82print("EXECUTION n°1 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:15:00")
83time_based = spark.sql(query_time_based)
84print(f"Result records: {time_based.count()}")
85time_based.show(truncate=False)
86
87
88print("EXECUTION n°2 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:30:00")
89spark.sql("SET VARIABLE analysis_end = TIMESTAMP'2024-01-15 10:30:00'")
90time_based = spark.sql(query_time_based)
91print(f"Result records: {time_based.count()}")
92time_based.show(truncate=False)
93
94
95
96print("""
97################################################
98##### 2. Variables vs String Interpolation #####
99################################################
100""")
101spark.sql("DECLARE VARIABLE target_location STRING DEFAULT 'warehouse_A'")
102
103print("UNSAFE: String interpolation (vulnerable to SQL injection)")
104def get_user_input() -> str :
105 return "warehouse_A' OR '1'='1"
106
107target_location = get_user_input()
108unsafe_interpolation = spark.sql(f"SELECT * FROM sensors_data WHERE location = '{target_location}'")
109print(f"Result records: {unsafe_interpolation.count()}")
110unsafe_interpolation.show(truncate=False)
111
112
113print("SAFE: Variable substitution (injection-proof)")
114#Malicious input becomes a string value, not SQL code
115spark.sql("SET VARIABLE target_location = \"warehouse_A' OR '1'='1\"")
116safe_interpolation = spark.sql("SELECT * FROM sensors_data WHERE location = target_location")
117print(f"Result records: {safe_interpolation.count()}")
118safe_interpolation.show(truncate=False)
119# Variable value is treated as string literal "US' OR '1'='1"
120
121
122
123spark.stop()
Result
1##########################
2##### Build Datasets #####
3##########################
4
5sensor dataset: 10
6+---------+-------------------+-----------+--------+-----------+------+
7|sensor_id|cur_timestamp |temperature|humidity|location |status|
8+---------+-------------------+-----------+--------+-----------+------+
9|SENSOR_01|2024-01-15 10:00:00|22.5 |45.2 |warehouse_A|normal|
10|SENSOR_02|2024-01-15 10:05:00|35.8 |78.5 |warehouse_A|normal|
11|SENSOR_03|2024-01-15 10:10:00|18.2 |42.1 |warehouse_B|normal|
12|SENSOR_01|2024-01-15 10:15:00|23.1 |46.8 |warehouse_A|normal|
13|SENSOR_02|2024-01-15 10:20:00|38.5 |82.3 |warehouse_A|alert |
14|SENSOR_04|2024-01-15 10:25:00|15.2 |38.5 |warehouse_C|normal|
15|SENSOR_03|2024-01-15 10:30:00|19.1 |43.7 |warehouse_B|normal|
16|SENSOR_01|2024-01-15 10:35:00|24.2 |47.5 |warehouse_A|normal|
17|SENSOR_04|2024-01-15 10:40:00|12.8 |35.2 |warehouse_C|alert |
18|SENSOR_02|2024-01-15 10:45:00|32.1 |72.5 |warehouse_A|normal|
19+---------+-------------------+-----------+--------+-----------+------+
20
21
22########################################
23##### 1. Time-Based Variable Usage #####
24########################################
25
26EXECUTION n°1 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:15:00
27Result records: 3
28+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
29|sensor_id|location |readings_in_window|avg_temp|temp_range |window_start |window_end |
30+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
31|SENSOR_01|warehouse_A|2 |22.8 |0.6000000000000014|2024-01-15 10:00:00|2024-01-15 10:15:00|
32|SENSOR_02|warehouse_A|1 |35.8 |0.0 |2024-01-15 10:00:00|2024-01-15 10:15:00|
33|SENSOR_03|warehouse_B|1 |18.2 |0.0 |2024-01-15 10:00:00|2024-01-15 10:15:00|
34+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
35
36EXECUTION n°2 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:30:00
37Result records: 4
38+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
39|sensor_id|location |readings_in_window|avg_temp|temp_range |window_start |window_end |
40+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
41|SENSOR_01|warehouse_A|2 |22.8 |0.6000000000000014|2024-01-15 10:00:00|2024-01-15 10:30:00|
42|SENSOR_02|warehouse_A|2 |37.15 |2.700000000000003 |2024-01-15 10:00:00|2024-01-15 10:30:00|
43|SENSOR_03|warehouse_B|2 |18.65 |0.9000000000000021|2024-01-15 10:00:00|2024-01-15 10:30:00|
44|SENSOR_04|warehouse_C|1 |15.2 |0.0 |2024-01-15 10:00:00|2024-01-15 10:30:00|
45+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
46
47
48################################################
49##### 2. Variables vs String Interpolation #####
50################################################
51
52UNSAFE: String interpolation (vulnerable to SQL injection)
53Result records: 10
54+---------+-------------------+-----------+--------+-----------+------+
55|sensor_id|cur_timestamp |temperature|humidity|location |status|
56+---------+-------------------+-----------+--------+-----------+------+
57|SENSOR_01|2024-01-15 10:00:00|22.5 |45.2 |warehouse_A|normal|
58|SENSOR_02|2024-01-15 10:05:00|35.8 |78.5 |warehouse_A|normal|
59|SENSOR_03|2024-01-15 10:10:00|18.2 |42.1 |warehouse_B|normal|
60|SENSOR_01|2024-01-15 10:15:00|23.1 |46.8 |warehouse_A|normal|
61|SENSOR_02|2024-01-15 10:20:00|38.5 |82.3 |warehouse_A|alert |
62|SENSOR_04|2024-01-15 10:25:00|15.2 |38.5 |warehouse_C|normal|
63|SENSOR_03|2024-01-15 10:30:00|19.1 |43.7 |warehouse_B|normal|
64|SENSOR_01|2024-01-15 10:35:00|24.2 |47.5 |warehouse_A|normal|
65|SENSOR_04|2024-01-15 10:40:00|12.8 |35.2 |warehouse_C|alert |
66|SENSOR_02|2024-01-15 10:45:00|32.1 |72.5 |warehouse_A|normal|
67+---------+-------------------+-----------+--------+-----------+------+
68
69SAFE: Variable substitution (injection-proof)
70Result records: 0
71+---------+-------------+-----------+--------+--------+------+
72|sensor_id|cur_timestamp|temperature|humidity|location|status|
73+---------+-------------+-----------+--------+--------+------+
74+---------+-------------+-----------+--------+--------+------+
Multi-Statement SQL Scripts
Introduction
Multi-statement SQL scripts enable executing multiple SQL commands in a single script file with procedural control flow. Supports variable declarations, conditional logic (IF/ELSE), loops (WHILE, FOR), exception handling (BEGIN/END blocks), and statement sequencing.
Detail
Before Spark 4.0, executing multiple SQL statements required either:
- Application-Level Orchestration : Each statement is a separate API call. No transactional semantics. Failure handling requires try-catch blocks in application code.
1spark.sql("CREATE TABLE staging AS SELECT * FROM source")
2spark.sql("DELETE FROM staging WHERE invalid = true")
3spark.sql("INSERT INTO target SELECT * FROM staging")
- External Workflow Tools : Adds operational complexity. Requires separate orchestration infrastructure. Simple SQL logic becomes Python/YAML configuration.
- Multiple Spark SQL Invocations : No state sharing between invocations. Cannot pass variables. Poor performance due to repeated session initialization.
Spark 4.0 introduces multi-statement scripts that execute as a single unit. Scripts contain multiple SQL statements separated by semicolons, with procedural control flow.
The script parser breaks input into individual statements and builds an execution plan. Each statement executes sequentially. Variables declared in early statements are available to later statements. Control flow statements (IF/ELSE, WHILE) alter execution order based on runtime conditions.
- Scripts execute on the Spark driver.
- Control flow is evaluated on the driver, not distributed to executors.
- Each statement goes through normal Catalyst optimization and execution.
- The execution model is sequential, not transactional : If statement 3 fails, statements 1 and 2 have already committed their changes. There is no automatic rollback. This differs from traditional database stored procedures which often provide transaction semantics.
Key language constructs :
- Variable Declaration and Assignment
- Conditional Execution : IF, CASE
- Iterative Execution : WHILE, FOR, REPEAT
- Exception Handling
Advantages
- Reduced External Dependencies : Simple ETL pipelines no longer require specific orchestration tools. SQL scripts can implement validation, transformation, and loading logic in a single file. Reduces operational complexity and infrastructure costs.
- Procedural Logic in SQL : IF/ELSE and WHILE constructs enable business logic implementation directly in SQL. Reduces context switching between SQL and Python/Scala. Teams with strong SQL skills can build complete pipelines without application code.
- Version Control and Portability : Scripts are plain text files that live in version control. Changes are tracked through Git. Scripts are portable across Spark deployments (local, cluster, Connect). Easier to review and audit than application code spread across multiple files.
- Server-Side Execution : With Spark Connect, entire script is sent to server and executes remotely. Reduces network round-trips compared to per-statement submission. Client only receives final results, not intermediate state.
Limitations
- No Transaction Rollback : Failed statements do not automatically undo previous statements. If script creates staging table, loads data, then fails on validation, the staging table remains. Manual cleanup required. This is fundamentally different from ACID databases.
- Limited Debugging Capabilities : No interactive debugger. No breakpoints. No variable inspection during execution. Debugging requires adding SELECT statements to print intermediate values. Error messages may not clearly indicate which statement failed in complex scripts.
- Complexity Threshold : Complex logic is better implemented in application code with proper testing frameworks. Scripts should remain linear data pipelines, not full applications.
- Error Handling Gaps : Cannot catch specific error types with granularity of try-catch in Python/Scala. Cannot continue execution after certain errors. Script failure model is all-or-nothing for many error types.
Real-World Use Cases
- Use Case 1 : Incremental Loading
- A loop that processes 24 hourly partitions one by one.
- A Script that adds new partitions for the 30 next days, checking if already exists.
- Use Case 2 : Environment-Specific Deployment
- Deployment scripts must create different table structures in dev vs prod (dev has additional debug columns). Script uses IF/ELSE based on environment variable to execute appropriate
CREATE TABLEstatements. Single script replaces separate dev and prod SQL files.
- Deployment scripts must create different table structures in dev vs prod (dev has additional debug columns). Script uses IF/ELSE based on environment variable to execute appropriate
Codes
Example : Execute multi-statement script with Spark Connect
Spark
Content of Spark Connect script :
1from pyspark.sql import SparkSession
2
3REMOTE_URL = "sc://localhost:15002"
4SCRIPT_SQL_NAME = "example_SQL.sql"
5
6# Connexion to Spark Connect
7spark = SparkSession.builder \
8 .remote(REMOTE_URL) \
9 .appName("SPARK_SQLScript") \
10 .config("spark.sql.ansi.enabled", "true") \
11 .getOrCreate()
12
13print("Successfully connected to Spark!")
14print(f"Spark version: {spark.version}")
15
16# Script reading
17with open(SCRIPT_SQL_NAME, "r", encoding="utf-8") as f:
18 sql_script = f.read()
19
20# Script SQL execution with only one spark call
21result = spark.sql(sql_script)
22result.show()
Content of the example_SQL.sql SQL script file :
1-- Spark 4.x - SQL Scripting
2
3BEGIN
4 -- =============================================
5 -- STEP n°1 : VARIABLES
6 -- =============================================
7 DECLARE batch_date DATE DEFAULT CURRENT_DATE();
8
9 -- Business thresholds
10 DECLARE min_valid_amount DECIMAL(10,2) DEFAULT 10.00;
11 DECLARE standard_threshold DECIMAL(10,2) DEFAULT 100.00;
12 DECLARE premium_threshold DECIMAL(10,2) DEFAULT 1000.00;
13
14 -- Metrics
15 DECLARE total_processed INT DEFAULT 0;
16 DECLARE total_invalid INT DEFAULT 0;
17 DECLARE total_revenue DECIMAL(15,2) DEFAULT 0.0;
18
19 -- =============================================
20 -- STEP n°2 : TABLES INITIALIZATION
21 -- =============================================
22
23 -- Staging table for raw data
24 DROP TABLE IF EXISTS raw_orders;
25 CREATE TABLE raw_orders (
26 order_id BIGINT,
27 customer_id BIGINT,
28 order_date DATE,
29 amount DECIMAL(10,2),
30 status STRING,
31 region STRING
32 ) USING parquet;
33
34 -- Table of validated orders
35 DROP TABLE IF EXISTS validated_orders;
36 CREATE TABLE validated_orders (
37 order_id BIGINT,
38 customer_id BIGINT,
39 order_date DATE,
40 amount DECIMAL(10,2),
41 region STRING,
42 order_category STRING,
43 loyalty_points DECIMAL(10,2),
44 processing_date DATE
45 ) USING parquet;
46
47 -- Quarantine table for invalid data
48 DROP TABLE IF EXISTS invalid_orders;
49 CREATE TABLE invalid_orders (
50 order_id BIGINT,
51 customer_id BIGINT,
52 amount DECIMAL(10,2),
53 error_reason STRING,
54 quarantine_timestamp TIMESTAMP
55 ) USING parquet;
56
57 -- =============================================
58 -- STEP n°3 : DATASETS
59 -- =============================================
60
61 INSERT INTO raw_orders VALUES
62 (1001, 501, DATE'2024-01-15', 150.50, 'completed', 'EMEA'),
63 (1002, 502, DATE'2024-01-15', 1250.00, 'completed', 'AMER'),
64 (1003, 503, DATE'2024-01-15', -50.00, 'completed', 'APAC'), -- Invalid: negative
65 (1004, 504, DATE'2024-01-15', 890.00, 'completed', 'EMEA'),
66 (2001, 505, DATE'2024-01-16', 2100.00, 'completed', 'AMER'),
67 (2002, 506, DATE'2024-01-16', NULL, 'completed', 'APAC'), -- Invalid: NULL amount
68 (2003, 507, DATE'2024-01-16', 450.00, 'completed', 'EMEA'),
69 (2004, 508, DATE'2024-01-16', 5.00, 'completed', 'AMER'), -- Invalid: below minimum
70 (3001, 509, DATE'2024-01-17', 750.00, 'completed', 'APAC'),
71 (3002, 510, DATE'2024-01-17', 1500.00, 'completed', 'EMEA'),
72 (3003, 511, DATE'2024-01-17', 220.00, 'completed', 'AMER');
73
74 -- =============================================
75 -- STEP n°4 : DATA VALIDATION
76 -- =============================================
77
78 -- Quarantining invalid records
79 INSERT INTO invalid_orders
80 SELECT
81 order_id,
82 customer_id,
83 amount,
84 CASE
85 WHEN amount IS NULL THEN 'NULL_AMOUNT'
86 WHEN amount < 0 THEN 'NEGATIVE_AMOUNT'
87 WHEN amount < min_valid_amount THEN 'BELOW_MINIMUM'
88 ELSE 'UNKNOWN_ERROR'
89 END AS error_reason,
90 CURRENT_TIMESTAMP() AS quarantine_timestamp
91 FROM raw_orders
92 WHERE status = 'completed'
93 AND (amount IS NULL
94 OR amount < 0
95 OR amount < min_valid_amount);
96
97 -- Counting invalid rows
98 SET total_invalid = (SELECT COUNT(*) FROM invalid_orders);
99
100 -- =============================================
101 -- STEP n°5 : VALID DATA TRANSFORMATION
102 -- =============================================
103
104 INSERT INTO validated_orders
105 SELECT
106 order_id,
107 customer_id,
108 order_date,
109 amount,
110 region,
111 -- CASE WHEN classification
112 CASE
113 WHEN amount >= premium_threshold THEN 'PREMIUM'
114 WHEN amount >= standard_threshold THEN 'STANDARD'
115 ELSE 'BASIC'
116 END AS order_category,
117 -- Calculation of loyalty points
118 CASE
119 WHEN amount >= premium_threshold THEN amount * 0.10
120 WHEN amount >= standard_threshold THEN amount * 0.05
121 ELSE amount * 0.02
122 END AS loyalty_points,
123 batch_date AS processing_date
124 FROM raw_orders
125 WHERE status = 'completed'
126 AND amount IS NOT NULL
127 AND amount >= min_valid_amount;
128
129 -- Counting total rows processed
130 SET total_processed = (SELECT COUNT(*) FROM validated_orders WHERE processing_date = batch_date);
131
132 -- Revenue calculation
133 SET total_revenue = (SELECT COALESCE(SUM(amount), 0) FROM validated_orders WHERE processing_date = batch_date);
134
135
136 -- =============================================
137 -- STEP n°6 : GENERATION OF FINAL REPORTS
138 -- =============================================
139 SELECT '========== EXECUTION - SUMMARY ==========' AS message;
140
141 -- Execution report
142 SELECT 'Batch Date' as `Metric`, CAST(batch_date AS STRING) as `Value`
143 UNION ALL SELECT 'Total Valid Orders', CAST(total_processed AS STRING)
144 UNION ALL SELECT 'Total Invalid Orders', CAST(total_invalid AS STRING)
145 UNION ALL SELECT 'Total Revenue', CAST(total_revenue AS STRING)
146 UNION ALL SELECT 'Invalid Ratio %', CAST(ROUND((total_invalid * 100.0) / GREATEST(total_processed + total_invalid, 1), 2) AS STRING);
147
148 -- =============================================
149 -- STEP n°7 : DROP TABLES
150 -- =============================================
151 DROP TABLE IF EXISTS raw_orders;
152 DROP TABLE IF EXISTS validated_orders;
153 DROP TABLE IF EXISTS invalid_orders;
154
155END;
Result
1Successfully connected to Spark!
2Spark version: 4.1.1
3+--------------------+----------+
4| Metric| Value|
5+--------------------+----------+
6| Batch Date|2026-03-10|
7| Total Valid Orders| 8|
8|Total Invalid Orders| 3|
9| Total Revenue| 7310.50|
10| Invalid Ratio %| 27.27|
11+--------------------+----------+
