Spark : v4.x - Fonctionnalités - Scripts SQL, Session et Conformité ANSI

Vous trouverez dans cet article, des informations sur les fonctionnalité SQL (Script, Session, Conformité) à partir de Spark v4.x.

Conformité ANSI SQL

Introduction

It enables type safety, prevents implicit casting errors and enforces reserved keyword restrictions.

Le mode de conformité ANSI SQL applique une sémantique SQL plus stricte correspondant à la norme ISO/IEC 9075. Cela permet de limiter les erreurs de typage, empêcher les erreurs de conversion implicite et appliquer les restrictions sur les mots-clés réservés.

Détail

Le comportement SQL par défaut de Spark privilégie la facilité d’utilisation plutôt que la rigueur. Les conversions de types implicites réussissent silencieusement même lorsqu’elles sont sémantiquement incorrectes. La division d’entiers retourne des entiers (troncature). Les opérations de dépassement de capacité bouclent sans erreur. Les mots-clés réservés peuvent être utilisés comme noms de colonnes sans guillemets.

La conformité ANSI SQL modifie fondamentalement ces comportements pour correspondre aux bases de données traditionnelles comme PostgreSQL, SQL Server et autres. Les changements affectent trois domaines principaux :

  • Rigueur du système de types : Les requêtes échouent au lieu de retourner silencieusement des résultats tronqués ou null
  • Gestion des NULL : Le mode ANSI applique une sémantique NULL plus stricte dans les prédicats et les opérations ensemblistes. NULL = NULL retourne toujours NULL (pas vrai). UNION, INTERSECT et EXCEPT traitent les valeurs NULL comme égales pour la déduplication (conformément à la norme SQL).
  • Mots-clés réservés : Le mode ANSI applique les restrictions sur les mots-clés réservés au SQL. Les noms de colonnes comme SELECT, FROM, WHERE doivent être entourés de backticks (guillemets inversés). Cela correspond au comportement des bases de données traditionnelles mais casse les requêtes Spark SQL qui utilisent ces noms librement.

L’implémentation affecte plusieurs composants de Spark :

  • Optimiseur Catalyst : Ajoute des règles de validation de types pendant la phase d’analyse. Rejette les requêtes avec des incompatibilités de types plus tôt dans la planification.
  • Génération de code : Modifie le code généré pour les opérations arithmétiques afin d’inclure des vérifications de dépassement de capacité. Ajoute des instructions de branchement qui vérifient les conditions de dépassement avant de retourner les résultats.
  • Exécution à la volée (runtime) : Encapsule les opérations dans des gestionnaires d’exceptions qui font échouer les tâches en cas de violations ANSI. Cela change la gestion des erreurs d’une propagation silencieuse de NULL à des échecs explicites.

Le mode est configuré par session via spark.sql.ansi.enabled. Une fois activé, toutes les requêtes de cette session suivent les règles ANSI.

Conseils

  • spark.sql.ansi.enabled = true :
    • Interrupteur principal pour la conformité ANSI. Active tous les comportements stricts : vérification du dépassement de capacité, sûreté des types, sémantique de division.
    • Doit être défini avant la création de la session pour un comportement cohérent.
    • Ne peut pas être modifié en cours de session dans certains modes de déploiement Spark.
  • TRY_CAST vs CAST en mode ANSI :
    • CAST() lève des exceptions en cas d’erreurs de conversion en mode ANSI. TRY_CAST() retourne NULL mais signale explicitement l’échec de validation.
    • Utilisez TRY_CAST() pour les pipelines de validation de données où les lignes incorrectes doivent être filtrées/journalisées plutôt que de faire échouer la tâche entière.
  • Fonction typeof() :
    • Retourne le type de données d’un résultat d’expression. Indique si le résultat est INT, DECIMAL, DOUBLE, etc. À utiliser pour identifier les changements de type (dans le cas d’une migration par exemple).
  • Guillemets pour les mots-clés réservés :
    • Le mode ANSI impose l’utilisation de backticks (guillemets inversés) pour les mots réservés. Les requêtes avec des noms de colonnes comme user, timestamp, order doivent être réécrites. Il s’agit d’un changement critique pour les grandes bases de code SQL.
    • Exécutez une analyse statique pour identifier les requêtes affectées avant la migration.

Avantages

  1. Amélioration de la sureté des types : La vérification stricte des types détecte les erreurs lors de la compilation de la requête plutôt qu’à l’exécution. Les problèmes d’évolution de schéma apparaissent plus tôt. Empêche les problèmes de qualité des données de se propager par les pipelines de données.
  2. Détection d’erreurs rapide : Les incompatibilités de types et les erreurs arithmétiques échouent immédiatement au lieu de produire silencieusement des résultats incorrects. Un pipeline qui traite des transactions financières ne corrompra pas silencieusement les données en cas de dépassement de capacité. Les erreurs apparaissent pendant les tests plutôt qu’en production.
  3. Compatibilité de migration de base de données : Les requêtes qui fonctionnaient correctement dans la base de données source se comportent de manière identique dans Spark. (Réduit le risque de migration et la charge de test)
  4. Conformité réglementaire : Les secteurs des services financiers et de la santé exigent un comportement de requête déterministe et auditable. Le mode ANSI fournit une sémantique prévisible qui correspond aux attentes réglementaires.

Limitations

  1. Modifications incompatibles avec les requêtes existantes : Les requêtes qui s’appuyaient sur le comportement permissif de Spark échoueront. Les opérations CAST qui retournaient précédemment NULL lèvent maintenant des exceptions. Les résultats de division d’entiers changent de 0 à 0,5. Les grandes bases de code nécessitent des tests et une refactorisation approfondis.
  2. Surcoût en terme de performance : Chaque addition, multiplication et division inclut des branches conditionnelles pour vérifier le dépassement de capacité. Les traitements avec beaucoup de calcul sur un volume important de donnée peut subir une dégradation de performance.
  3. Configuration tout ou rien : Impossible d’activer le mode ANSI pour des requêtes ou des tables spécifiques. Le paramètre au niveau de la session affecte toutes les opérations. Une migration progressive nécessite l’exécution de deux sessions Spark distinctes avec des configurations différentes.
  4. Les erreurs peuvent faire échouer des tâches entières : Une seule ligne avec une erreur de conversion de type fait échouer la tâche entière. En mode par défaut, les lignes incorrectes deviennent NULL et le traitement continue. Le mode ANSI manque d’une gestion d’erreur au niveau de la ligne. Nécessite une pré-validation ou un pipeline de gestion d’erreurs distinct.

Cas d’usages (concret)

  • Cas d’usage 1 : Migration de PostgreSQL vers Spark
    • Une entreprise migre plus de 500 requêtes SQL d’un entrepôt de données PostgreSQL vers un cluster Spark. Les requêtes contiennent des divisions d’entiers, des calculs sujets au dépassement de capacité et des dépendances de types strictes. Le mode ANSI permet d’exécuter les requêtes sans modification. Sans le mode ANSI, un pourcentage important des requêtes peut nécessiter une réécriture pour correspondre à la sémantique de PostgreSQL.
  • Cas d’usage 2 : Précision des calculs financiers
    • Une société de services financiers calcule les intérêts courus sur des portefeuilles de prêts. La division d’entiers par défaut de Spark principal * taux / 365 tronquerait à zéro pour les petits taux journaliers. Le mode ANSI impose une division décimale produisant des résultats précis. Cela peut éviter des erreurs de calcul de plusieurs millions de dollars.
  • Cas d’usage 3 : Pipeline de validation de la qualité des données
    • Un pipeline ETL valide les données entrantes par rapport à des schémas de types stricts. Les enregistrements avec des conversions de types invalides doivent être rejetés et non silencieusement convertis en NULL. Le mode ANSI fait échouer les tâches dès le premier enregistrement incorrect, déclenchant des alertes. Les équipes enquêtent immédiatement sur les problèmes de qualité des données au lieu de découvrir des données corrompues en aval.

Codes

Exemple : Conversion des types et comportement des opérations arithmétiques

Cet exemple montre comment le mode ANSI modifie la conversion des types et les opérations arithmétiques. Il illustre les changements importants par rapport au comportement par défaut.

Spark : Mode par défaut

  1# PySpark Example - Not ANSI SQL Type Safety and Arithmetic
  2from pyspark.sql import SparkSession
  3from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType
  4
  5URL_MASTER = "spark://spark-master:7077"
  6
  7
  8spark = SparkSession.builder \
  9    .appName("SPARK_DefaultSQLMode") \
 10    .config("spark.sql.ansi.enabled", "false") \
 11    .master(URL_MASTER) \
 12    .getOrCreate()
 13
 14
 15print("""
 16##########################
 17##### Build Datasets #####
 18##########################
 19""")
 20
 21# Source system 1: Financial dataset
 22# Demonstrates real-world scenarios where ANSI mode prevents errors
 23financial_data = [
 24    ("LOAN001", 10000, 5.25, 365, "2024-01-15"),    # Standard loan
 25    ("LOAN002", 50000, 4.75, 365, "2024-01-15"),    # Large principal
 26    ("LOAN003", 1500, 12.50, 365, "2024-01-15"),    # High interest rate
 27    ("LOAN004", 2147483647, 3.00, 365, "2024-01-15"), # Near integer max (overflow risk)
 28    ("LOAN005", 25000, 6.00, 365, "2024-01-15"),    # Medium loan
 29    ("LOAN006", 8000, 0.00, 365, "2024-01-15"),     # Zero interest (division edge case)
 30    ("LOAN007", 100000, 7.25, 365, "2024-01-15"),   # Large loan
 31    ("LOAN008", 500, 15.00, 365, "2024-01-15"),     # Small principal, high rate
 32    ("LOAN009", 75000, 4.25, 365, "2024-01-15"),    # Standard
 33    ("LOAN010", 1000000, 3.50, 365, "2024-01-15"),  # Very large principal (overflow in calculations)
 34]
 35
 36# Source system 2: Problematic Financial dataset
 37# Test dataset with intentionally problematic values for type conversion
 38problematic_data = [
 39    ("CONV001", "12345", "2024-01-15"),   # Valid integer string
 40    ("CONV002", "67890", "2024-01-15"),   # Valid integer string
 41    ("CONV003", "abc123", "2024-01-15"),  # Invalid integer (contains letters)
 42    ("CONV004", "99999", "2024-01-15"),   # Valid integer
 43    ("CONV005", "12.34", "2024-01-15"),   # Decimal in integer field
 44    ("CONV006", "", "2024-01-15"),        # Empty string
 45    ("CONV007", "55555", "2024-01-15"),   # Valid integer
 46    ("CONV008", "1e5", "2024-01-15"),     # Scientific notation
 47    ("CONV009", "42", "2024-01-15"),      # Valid integer
 48    ("CONV010", "NULL", "2024-01-15"),    # String "NULL" not actual NULL
 49]
 50
 51financial_schema = StructType([
 52    StructField("loan_id",  StringType()),
 53    StructField("principal", IntegerType()),
 54    StructField("annual_rate", DoubleType()),  
 55    StructField("days",   IntegerType()),
 56    StructField("date", StringType()),
 57])
 58
 59problematic_schema = StructType([
 60    StructField("record_id", StringType()),
 61    StructField("value_str", StringType()),
 62    StructField("date", StringType()),
 63])
 64
 65
 66# Create DataFrame in both sessions
 67loans_data = spark.createDataFrame(financial_data,financial_schema)
 68print(f"financial_data dataset: {loans_data.count()}")  
 69loans_data.show(truncate=False)
 70
 71
 72problems_data = spark.createDataFrame(problematic_data,problematic_schema)
 73print(f"problematic_data dataset: {problems_data.count()}")  
 74problems_data.show(truncate=False)
 75
 76
 77# Register DataFrames as temporary views for SQL access
 78loans_data.createOrReplaceTempView("loans_data")
 79problems_data.createOrReplaceTempView("problems_data")
 80
 81
 82print("""
 83########################################
 84##### 1. Integer Overflow Behavior #####
 85########################################
 86""")
 87
 88
 89overflow = spark.sql("""
 90    SELECT 
 91        loan_id,
 92        principal,
 93        principal + 1000000 as principal_plus_1m,
 94        CASE 
 95            WHEN principal + 1000000 < 0 THEN 'OVERFLOW DETECTED'
 96            ELSE 'OK'
 97        END as overflow_check
 98    FROM loans_data
 99    WHERE loan_id IN ('LOAN004', 'LOAN010')
100""")
101
102# LOAN004 (at INT max) wrapped to negative value
103overflow.show(truncate=False)
104
105
106
107print("""
108####################################
109##### 2. Type Casting Behavior #####
110####################################
111""")
112
113
114cast = spark.sql("""
115    SELECT 
116        record_id,
117        value_str,
118        CAST(value_str AS INT) as value_int,
119        CASE 
120            WHEN CAST(value_str AS INT) IS NULL THEN 'CAST FAILED'
121            ELSE 'CAST OK'
122        END as cast_status
123    FROM problems_data
124    ORDER BY record_id
125""")
126
127# Invalid values became NULL without any error indication
128cast.show(truncate=False)
129
130
131spark.stop()

Résultat : Mode par défaut

 1##########################
 2##### Build Datasets #####
 3##########################
 4
 5financial_data dataset: 10
 6+-------+----------+-----------+----+----------+
 7|loan_id|principal |annual_rate|days|date      |
 8+-------+----------+-----------+----+----------+
 9|LOAN001|10000     |5.25       |365 |2024-01-15|
10|LOAN002|50000     |4.75       |365 |2024-01-15|
11|LOAN003|1500      |12.5       |365 |2024-01-15|
12|LOAN004|2147483647|3.0        |365 |2024-01-15|
13|LOAN005|25000     |6.0        |365 |2024-01-15|
14|LOAN006|8000      |0.0        |365 |2024-01-15|
15|LOAN007|100000    |7.25       |365 |2024-01-15|
16|LOAN008|500       |15.0       |365 |2024-01-15|
17|LOAN009|75000     |4.25       |365 |2024-01-15|
18|LOAN010|1000000   |3.5        |365 |2024-01-15|
19+-------+----------+-----------+----+----------+
20
21problematic_data dataset: 10
22+---------+---------+----------+
23|record_id|value_str|date      |
24+---------+---------+----------+
25|CONV001  |12345    |2024-01-15|
26|CONV002  |67890    |2024-01-15|
27|CONV003  |abc123   |2024-01-15|
28|CONV004  |99999    |2024-01-15|
29|CONV005  |12.34    |2024-01-15|
30|CONV006  |         |2024-01-15|
31|CONV007  |55555    |2024-01-15|
32|CONV008  |1e5      |2024-01-15|
33|CONV009  |42       |2024-01-15|
34|CONV010  |NULL     |2024-01-15|
35+---------+---------+----------+
36
37
38##############################################################
39##### 1. Calculate daily interest using integer division #####
40##############################################################
41
42+-------+----------+-----------+------------------+-----------+
43|loan_id|principal |annual_rate|daily_interest    |result_type|
44+-------+----------+-----------+------------------+-----------+
45|LOAN001|10000     |5.25       |1.4383561643835616|double     |
46|LOAN002|50000     |4.75       |6.506849315068493 |double     |
47|LOAN003|1500      |12.5       |0.5136986301369864|double     |
48|LOAN004|2147483647|3.0        |176505.5052328767 |double     |
49|LOAN005|25000     |6.0        |4.109589041095891 |double     |
50|LOAN006|8000      |0.0        |0.0               |double     |
51|LOAN007|100000    |7.25       |19.863013698630137|double     |
52|LOAN008|500       |15.0       |0.2054794520547945|double     |
53|LOAN009|75000     |4.25       |8.732876712328768 |double     |
54|LOAN010|1000000   |3.5        |95.89041095890411 |double     |
55+-------+----------+-----------+------------------+-----------+
56
57
58########################################
59##### 2. Integer Overflow Behavior #####
60########################################
61
62+-------+----------+-----------------+-----------------+
63|loan_id|principal |principal_plus_1m|overflow_check   |
64+-------+----------+-----------------+-----------------+
65|LOAN004|2147483647|-2146483649      |OVERFLOW DETECTED|
66|LOAN010|1000000   |2000000          |OK               |
67+-------+----------+-----------------+-----------------+
68
69
70####################################
71##### 3. Type Casting Behavior #####
72####################################
73
74+---------+---------+---------+-----------+
75|record_id|value_str|value_int|cast_status|
76+---------+---------+---------+-----------+
77|CONV001  |12345    |12345    |CAST OK    |
78|CONV002  |67890    |67890    |CAST OK    |
79|CONV003  |abc123   |NULL     |CAST FAILED|
80|CONV004  |99999    |99999    |CAST OK    |
81|CONV005  |12.34    |12       |CAST OK    |
82|CONV006  |         |NULL     |CAST FAILED|
83|CONV007  |55555    |55555    |CAST OK    |
84|CONV008  |1e5      |NULL     |CAST FAILED|
85|CONV009  |42       |42       |CAST OK    |
86|CONV010  |NULL     |NULL     |CAST FAILED|
87+---------+---------+---------+-----------+

Spark : Mode ANSI

  1# PySpark Example - ANSI SQL Type Safety and Arithmetic
  2from pyspark.sql import SparkSession
  3from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType
  4
  5URL_MASTER = "spark://spark-master:7077"
  6
  7
  8spark = SparkSession.builder \
  9    .appName("SPARK_ANSISQLMode") \
 10    .config("spark.sql.ansi.enabled", "true") \
 11    .master(URL_MASTER) \
 12    .getOrCreate()
 13
 14
 15print("""
 16##########################
 17##### Build Datasets #####
 18##########################
 19""")
 20
 21# Source system 1: Financial dataset
 22# Demonstrates real-world scenarios where ANSI mode prevents errors
 23financial_data = [
 24    ("LOAN001", 10000, 5.25, 365, "2024-01-15"),    # Standard loan
 25    ("LOAN002", 50000, 4.75, 365, "2024-01-15"),    # Large principal
 26    ("LOAN003", 1500, 12.50, 365, "2024-01-15"),    # High interest rate
 27    ("LOAN004", 2147483647, 3.00, 365, "2024-01-15"), # Near integer max (overflow risk)
 28    ("LOAN005", 25000, 6.00, 365, "2024-01-15"),    # Medium loan
 29    ("LOAN006", 8000, 0.00, 365, "2024-01-15"),     # Zero interest (division edge case)
 30    ("LOAN007", 100000, 7.25, 365, "2024-01-15"),   # Large loan
 31    ("LOAN008", 500, 15.00, 365, "2024-01-15"),     # Small principal, high rate
 32    ("LOAN009", 75000, 4.25, 365, "2024-01-15"),    # Standard
 33    ("LOAN010", 1000000, 3.50, 365, "2024-01-15"),  # Very large principal (overflow in calculations)
 34]
 35
 36# Source system 2: Problematic Financial dataset
 37# Test dataset with intentionally problematic values for type conversion
 38problematic_data = [
 39    ("CONV001", "12345", "2024-01-15"),   # Valid integer string
 40    ("CONV002", "67890", "2024-01-15"),   # Valid integer string
 41    ("CONV003", "abc123", "2024-01-15"),  # Invalid integer (contains letters)
 42    ("CONV004", "99999", "2024-01-15"),   # Valid integer
 43    ("CONV005", "12.34", "2024-01-15"),   # Decimal in integer field
 44    ("CONV006", "", "2024-01-15"),        # Empty string
 45    ("CONV007", "55555", "2024-01-15"),   # Valid integer
 46    ("CONV008", "1e5", "2024-01-15"),     # Scientific notation
 47    ("CONV009", "42", "2024-01-15"),      # Valid integer
 48    ("CONV010", "NULL", "2024-01-15"),    # String "NULL" not actual NULL
 49]
 50
 51financial_schema = StructType([
 52    StructField("loan_id",  StringType()),
 53    StructField("principal", IntegerType()),
 54    StructField("annual_rate", DoubleType()),  
 55    StructField("days",   IntegerType()),
 56    StructField("date", StringType()),
 57])
 58
 59problematic_schema = StructType([
 60    StructField("record_id", StringType()),
 61    StructField("value_str", StringType()),
 62    StructField("date", StringType()),
 63])
 64
 65
 66# Create DataFrame in both sessions
 67loans_data = spark.createDataFrame(financial_data,financial_schema)
 68print(f"financial_data dataset: {loans_data.count()}")  
 69loans_data.show(truncate=False)
 70
 71
 72problems_data = spark.createDataFrame(problematic_data,problematic_schema)
 73print(f"problematic_data dataset: {problems_data.count()}")  
 74problems_data.show(truncate=False)
 75
 76
 77# Register DataFrames as temporary views for SQL access
 78loans_data.createOrReplaceTempView("loans_data")
 79problems_data.createOrReplaceTempView("problems_data")
 80
 81
 82print("""
 83########################################
 84##### 1. Integer Overflow Behavior #####
 85########################################
 86""")
 87
 88
 89try:
 90    overflow = spark.sql("""
 91        SELECT 
 92            loan_id,
 93            principal,
 94            principal + 1000000 as principal_plus_1m
 95        FROM loans_data
 96        WHERE loan_id IN ('LOAN004', 'LOAN010')
 97    """)
 98    overflow.show(truncate=False)
 99    print("Query completed (no overflow detected)")
100except Exception as e:
101    print(f"Exception Caught: {type(e).__name__}")
102    print(f"Message: {str(e)[:200]}")
103    print("\nThis is EXPECTED in ANSI mode - overflow is an error, not silent corruption")
104
105
106
107print("""
108####################################
109##### 2. Type Casting Behavior #####
110####################################
111""")
112
113try:
114    cast = spark.sql("""
115        SELECT 
116            record_id,
117            value_str,
118            CAST(value_str AS INT) as value_int
119        FROM problems_data
120        ORDER BY record_id
121    """)
122    cast.show(truncate=False)
123except Exception as e:
124    print(f"Exception Caught: {type(e).__name__}")
125    print(f"Failed on record: {str(e)[str(e).find('CONV'):str(e).find('CONV')+10] if 'CONV' in str(e) else 'unknown'}")
126    print("\nThis is EXPECTED in ANSI mode - forces explicit error handling")
127
128
129print("\n##### with TRY_CAST: Safe Error Handling #####")
130try_cast = spark.sql("""
131    SELECT 
132        record_id,
133        value_str,
134        TRY_CAST(value_str AS INT) as value_int,
135        CASE 
136            WHEN TRY_CAST(value_str AS INT) IS NULL THEN 'CONVERSION_ERROR'
137            ELSE 'OK'
138        END as validation_status
139    FROM problems_data
140    ORDER BY record_id
141""")
142# TRY_CAST provides NULL on error but explicitly marks rows as problematic
143try_cast.show(truncate=False)
144
145
146spark.stop()

Résultat : Mode ANSI

 1##########################
 2##### Build Datasets #####
 3##########################
 4
 5financial_data dataset: 10
 6+-------+----------+-----------+----+----------+
 7|loan_id|principal |annual_rate|days|date      |
 8+-------+----------+-----------+----+----------+
 9|LOAN001|10000     |5.25       |365 |2024-01-15|
10|LOAN002|50000     |4.75       |365 |2024-01-15|
11|LOAN003|1500      |12.5       |365 |2024-01-15|
12|LOAN004|2147483647|3.0        |365 |2024-01-15|
13|LOAN005|25000     |6.0        |365 |2024-01-15|
14|LOAN006|8000      |0.0        |365 |2024-01-15|
15|LOAN007|100000    |7.25       |365 |2024-01-15|
16|LOAN008|500       |15.0       |365 |2024-01-15|
17|LOAN009|75000     |4.25       |365 |2024-01-15|
18|LOAN010|1000000   |3.5        |365 |2024-01-15|
19+-------+----------+-----------+----+----------+
20
21problematic_data dataset: 10
22+---------+---------+----------+
23|record_id|value_str|date      |
24+---------+---------+----------+
25|CONV001  |12345    |2024-01-15|
26|CONV002  |67890    |2024-01-15|
27|CONV003  |abc123   |2024-01-15|
28|CONV004  |99999    |2024-01-15|
29|CONV005  |12.34    |2024-01-15|
30|CONV006  |         |2024-01-15|
31|CONV007  |55555    |2024-01-15|
32|CONV008  |1e5      |2024-01-15|
33|CONV009  |42       |2024-01-15|
34|CONV010  |NULL     |2024-01-15|
35+---------+---------+----------+
36
37
38########################################
39##### 1. Integer Overflow Behavior #####
40########################################
41
42== SQL (line 5, position 13) ==
43            principal + 1000000 as principal_plus_1m
44            ^^^^^^^^^^^^^^^^^^^
45
46	at org.apache.spark.sql.errors.ExecutionErrors.arithmeticOverflowError(ExecutionErrors.scala:132)
47	...
48	at java.base/java.lang.Thread.run(Thread.java:1583)
49
50
51Exception Caught: ArithmeticException
52Message: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_add' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22003
53== SQL (l
54
55This is EXPECTED in ANSI mode - overflow is an error, not silent corruption
56
57####################################
58##### 2. Type Casting Behavior #####
59####################################
60
61== SQL (line 5, position 13) ==
62            CAST(value_str AS INT) as value_int
63            ^^^^^^^^^^^^^^^^^^^^^^
64
65	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:147)
66	...
67	at java.base/java.lang.Thread.run(Thread.java:1583)
68
69Exception Caught: NumberFormatException
70Failed on record: unknown
71
72This is EXPECTED in ANSI mode - forces explicit error handling
73
74##### with TRY_CAST: Safe Error Handling #####
75
76+---------+---------+---------+-----------------+
77|record_id|value_str|value_int|validation_status|
78+---------+---------+---------+-----------------+
79|CONV001  |12345    |12345    |OK               |
80|CONV002  |67890    |67890    |OK               |
81|CONV003  |abc123   |NULL     |CONVERSION_ERROR |
82|CONV004  |99999    |99999    |OK               |
83|CONV005  |12.34    |NULL     |CONVERSION_ERROR |
84|CONV006  |         |NULL     |CONVERSION_ERROR |
85|CONV007  |55555    |55555    |OK               |
86|CONV008  |1e5      |NULL     |CONVERSION_ERROR |
87|CONV009  |42       |42       |OK               |
88|CONV010  |NULL     |NULL     |CONVERSION_ERROR |
89+---------+---------+---------+-----------------+

Variables SQL de Session

Introduction

Les variables SQL de session permettent le paramétrage dynamique des requêtes sans concaténation de chaînes de caractères ni de fichiers de configuration externes. (Les variables sont limitées à la session et typées)

Détail

Avant Spark 4.0, le paramétrage des requêtes SQL nécessitait trois approches problématiques :

  1. Interpolation des chaînes de caractères dans le code applicatif
    • Risque : injection SQL si country provient d’une entrée utilisateur. Nécessite la recompilation du plan de requête pour chaque valeur de paramètre.
    • Exemple :
1country = "US"
2df = spark.sql(f"SELECT * FROM orders WHERE country = '{country}'")
  1. Propriétés de configuration Spark
    • Syntaxe verbeuse. Limité aux types String. Pollution de l’espace de noms de configuration.
    • Exemple :
1spark.conf.set("my.param.country", "US")
2spark.sql("SELECT * FROM orders WHERE country = '${spark.my.param.country}'")
  1. Fichiers de configuration externes
    • Ajoute une dépendance aux systèmes externes. Possible dérive de configuration entre les environnements.
    • Exemple :
1params = yaml.load("config.yaml")
2spark.sql(f"SELECT * FROM orders WHERE country = '{params['country']}'")

Spark 4.0 introduit les variables SQL qui résolvent ces problèmes. Les variables sont déclarées avec des types explicites et limitées à la session :

1DECLARE min_amount DECIMAL(10,2) DEFAULT 100.0;
2SELECT * FROM orders WHERE amount >= min_amount;

Les deux syntaxes supportées :

  1. Instruction DECLARE (Norme SQL)
    • Déclaration de type explicite. Requis en mode ANSI SQL.
    • Exemple :
1DECLARE variable_name TYPE [DEFAULT value];
  1. Instruction SET (Rétrocompatibilité)
    • Type déduit de la valeur. Compatible avec la syntaxe des propriétés de session Spark 3.x. Les variables sont typées de manière sûre une fois définies.
    • Exemple :
1SET variable_name = value;

La référence est résolue lors de la compilation de la requête, pas à l’exécution. Cela permet à l’optimiseur Catalyst d’utiliser les valeurs des variables pour le pushdown de prédicats et l’élagage de partitions (partition pruning).

Le système de variables maintient une table de symboles au niveau de la session. Chaque variable possède :

  • Nom : Identifiant utilisé dans les requêtes
  • Type : Type de données scalaire (Int, String, Decimal, Boolean, Date, Timestamp)
  • Valeur : Valeur actuelle (mutable avec SET ultérieurs)
  • Portée : Session uniquement (non visible par les autres sessions)

Les variables s’intègrent à la planification des requêtes. Lorsque Catalyst analyse une requête avec variable_name, il :

  1. Recherche la variable dans la table de symboles de session
  2. Remplace la référence par la valeur littérale
  3. Procède à l’optimisation en utilisant la valeur concrète

Les performances d’exécution sont identiques aux littéraux codés en dur.

Avantages

  1. Paramétrage Type-Safe : Les variables ont des types explicites vérifiés à la déclaration. L’assignation d’un mauvais type produit une erreur de compilation. Cela prévient les erreurs de type à l’exécution provenant de paramètres mal configurés.
  2. Prévention de l’injection SQL : Les références de variables sont résolues lors de la compilation de la requête, pas par substitution de chaînes de caractères à l’exécution. Les entrées malveillantes dans les valeurs de variables ne peuvent pas altérer la structure de la requête.
  3. Configuration multi-environnement simplifiée : Le même script SQL s’exécute en dev/rec/prod avec différentes valeurs de variables. Aucune modification de code entre les environnements. Les variables peuvent être définies à partir d’arguments en ligne de commande ou de variables d’environnement au démarrage de la session.
  4. Paramétrage pris en compte par l’optimiseur Catalyst : L’optimiseur Catalyst voit les valeurs des variables lors de la planification. Cela permet l’élagage de partitions (partition pruning) et le pushdown de prédicats basés sur les valeurs des variables.

Limitations

  1. Portée limitée à la session : Les variables ne persistent pas après la clôture de la session. Pas de variables globales partagées entre sessions concurrentes. Chaque client doit redéclarer les variables. Inadapté pour la configuration au niveau du cluster nécessitant une cohérence entre les jobs.
  2. Pas de types complexes : Les variables ne supportent que les types scalaires. Impossible de déclarer des variables ARRAY, STRUCT ou MAP. Le passage de valeurs de paramètres complexes nécessite toujours des propriétés de configuration ou un stockage externe.
  3. Pas d’API de persistance des variables : Les variables ne peuvent pas être sauvegardées dans le metastore ou un stockage externe. Aucun mécanisme intégré pour charger des variables depuis des fichiers YAML/JSON. Les équipes doivent construire une gestion de variables personnalisée au-dessus des variables SQL de base.

Cas d’usages (concret)

  • Cas d’usage 1 : Rapports paramétrés
    • Définir une plage de dates en début de script et l’utiliser dans plusieurs requêtes sans répétition.
  • Cas d’usage 2 : Traitement de données spécifique à l’environnement
    • Un pipeline ETL s’exécute dans les environnements dev, rec et prod. Chaque environnement lit depuis différents préfixes de base de données (dev_db.orders, prod_db.orders). Les variables source_database et target_database sont définies à partir de variables d’environnement au démarrage du job. Le même script SQL fonctionne dans tous les environnements sans modification.

Codes

Exemple : Fonctionnement des variables SQL

Spark

  1# PySpark Example : Demonstrates how variables works
  2from pyspark.sql import SparkSession
  3from pyspark.sql.types import DoubleType, StringType, StructField, StructType
  4from pyspark.sql.functions import to_timestamp
  5
  6URL_MASTER = "spark://spark-master:7077"
  7
  8
  9spark = SparkSession.builder \
 10    .appName("SPARK_APPVARIABLE") \
 11    .config("spark.sql.ansi.enabled", "true") \
 12    .master(URL_MASTER) \
 13    .getOrCreate()
 14
 15
 16print("""
 17##########################
 18##### Build Datasets #####
 19##########################
 20""")
 21
 22# Source system 1: Sensor dataset
 23# Demonstrates real-world scenarios where ANSI mode prevents errors
 24sensor_readings = [
 25    ("SENSOR_01", "2024-01-15 10:00:00", 22.5, 45.2, "warehouse_A", "normal"),
 26    ("SENSOR_02", "2024-01-15 10:05:00", 35.8, 78.5, "warehouse_A", "normal"),  # High temp
 27    ("SENSOR_03", "2024-01-15 10:10:00", 18.2, 42.1, "warehouse_B", "normal"),
 28    ("SENSOR_01", "2024-01-15 10:15:00", 23.1, 46.8, "warehouse_A", "normal"),
 29    ("SENSOR_02", "2024-01-15 10:20:00", 38.5, 82.3, "warehouse_A", "alert"),   # Anomaly
 30    ("SENSOR_04", "2024-01-15 10:25:00", 15.2, 38.5, "warehouse_C", "normal"),  # Low temp
 31    ("SENSOR_03", "2024-01-15 10:30:00", 19.1, 43.7, "warehouse_B", "normal"),
 32    ("SENSOR_01", "2024-01-15 10:35:00", 24.2, 47.5, "warehouse_A", "normal"),
 33    ("SENSOR_04", "2024-01-15 10:40:00", 12.8, 35.2, "warehouse_C", "alert"),   # Anomaly
 34    ("SENSOR_02", "2024-01-15 10:45:00", 32.1, 72.5, "warehouse_A", "normal"),
 35]
 36
 37
 38sensor_schema = StructType([
 39    StructField("sensor_id",  StringType()),
 40    StructField("cur_timestamp", StringType()),
 41    StructField("temperature", DoubleType()),  
 42    StructField("humidity",   DoubleType()),
 43    StructField("location", StringType()),
 44    StructField("status", StringType()),
 45])
 46
 47
 48# Create DataFrame
 49sensors_df = spark.createDataFrame(sensor_readings,sensor_schema).withColumn("cur_timestamp",to_timestamp("cur_timestamp", format="yyyy-MM-dd HH:mm:ss"))
 50print(f"sensor dataset: {sensors_df.count()}")  
 51sensors_df.show(truncate=False)
 52
 53# Register DataFrames as temporary views for SQL access
 54sensors_df.createOrReplaceTempView("sensors_data")
 55
 56
 57
 58print("""
 59########################################
 60##### 1. Time-Based Variable Usage #####
 61########################################
 62""")
 63
 64spark.sql("DECLARE VARIABLE analysis_start TIMESTAMP DEFAULT TIMESTAMP'2024-01-15 10:00:00'")
 65spark.sql("DECLARE VARIABLE analysis_end TIMESTAMP DEFAULT TIMESTAMP'2024-01-15 10:15:00'")
 66
 67query_time_based = """
 68    SELECT 
 69        sensor_id,
 70        location,
 71        COUNT(*) as readings_in_window,
 72        AVG(temperature) as avg_temp,
 73        MAX(temperature) - MIN(temperature) as temp_range,
 74        analysis_start as window_start,
 75        analysis_end as window_end
 76    FROM sensors_data
 77    WHERE cur_timestamp BETWEEN analysis_start AND analysis_end
 78    GROUP BY sensor_id, location
 79    ORDER BY sensor_id, location;
 80"""
 81
 82print("EXECUTION n°1 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:15:00")
 83time_based = spark.sql(query_time_based)
 84print(f"Result records: {time_based.count()}")  
 85time_based.show(truncate=False)
 86
 87
 88print("EXECUTION n°2 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:30:00")
 89spark.sql("SET VARIABLE analysis_end = TIMESTAMP'2024-01-15 10:30:00'")
 90time_based = spark.sql(query_time_based)
 91print(f"Result records: {time_based.count()}")  
 92time_based.show(truncate=False)
 93
 94
 95
 96print("""
 97################################################
 98##### 2. Variables vs String Interpolation #####
 99################################################
100""")
101spark.sql("DECLARE VARIABLE target_location STRING DEFAULT 'warehouse_A'")
102
103print("UNSAFE: String interpolation (vulnerable to SQL injection)")
104def get_user_input() -> str : 
105    return "warehouse_A' OR '1'='1"
106
107target_location = get_user_input() 
108unsafe_interpolation = spark.sql(f"SELECT * FROM sensors_data WHERE location = '{target_location}'")
109print(f"Result records: {unsafe_interpolation.count()}")  
110unsafe_interpolation.show(truncate=False)
111
112
113print("SAFE: Variable substitution (injection-proof)")
114#Malicious input becomes a string value, not SQL code
115spark.sql("SET VARIABLE target_location = \"warehouse_A' OR '1'='1\"")
116safe_interpolation = spark.sql("SELECT * FROM sensors_data WHERE location = target_location")
117print(f"Result records: {safe_interpolation.count()}")  
118safe_interpolation.show(truncate=False)
119# Variable value is treated as string literal "US' OR '1'='1"
120
121
122
123spark.stop()

Résultat

 1##########################
 2##### Build Datasets #####
 3##########################
 4
 5sensor dataset: 10
 6+---------+-------------------+-----------+--------+-----------+------+
 7|sensor_id|cur_timestamp      |temperature|humidity|location   |status|
 8+---------+-------------------+-----------+--------+-----------+------+
 9|SENSOR_01|2024-01-15 10:00:00|22.5       |45.2    |warehouse_A|normal|
10|SENSOR_02|2024-01-15 10:05:00|35.8       |78.5    |warehouse_A|normal|
11|SENSOR_03|2024-01-15 10:10:00|18.2       |42.1    |warehouse_B|normal|
12|SENSOR_01|2024-01-15 10:15:00|23.1       |46.8    |warehouse_A|normal|
13|SENSOR_02|2024-01-15 10:20:00|38.5       |82.3    |warehouse_A|alert |
14|SENSOR_04|2024-01-15 10:25:00|15.2       |38.5    |warehouse_C|normal|
15|SENSOR_03|2024-01-15 10:30:00|19.1       |43.7    |warehouse_B|normal|
16|SENSOR_01|2024-01-15 10:35:00|24.2       |47.5    |warehouse_A|normal|
17|SENSOR_04|2024-01-15 10:40:00|12.8       |35.2    |warehouse_C|alert |
18|SENSOR_02|2024-01-15 10:45:00|32.1       |72.5    |warehouse_A|normal|
19+---------+-------------------+-----------+--------+-----------+------+
20
21
22########################################
23##### 1. Time-Based Variable Usage #####
24########################################
25
26EXECUTION n°1 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:15:00
27Result records: 3
28+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
29|sensor_id|location   |readings_in_window|avg_temp|temp_range        |window_start       |window_end         |
30+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
31|SENSOR_01|warehouse_A|2                 |22.8    |0.6000000000000014|2024-01-15 10:00:00|2024-01-15 10:15:00|
32|SENSOR_02|warehouse_A|1                 |35.8    |0.0               |2024-01-15 10:00:00|2024-01-15 10:15:00|
33|SENSOR_03|warehouse_B|1                 |18.2    |0.0               |2024-01-15 10:00:00|2024-01-15 10:15:00|
34+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
35
36EXECUTION n°2 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:30:00
37Result records: 4
38+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
39|sensor_id|location   |readings_in_window|avg_temp|temp_range        |window_start       |window_end         |
40+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
41|SENSOR_01|warehouse_A|2                 |22.8    |0.6000000000000014|2024-01-15 10:00:00|2024-01-15 10:30:00|
42|SENSOR_02|warehouse_A|2                 |37.15   |2.700000000000003 |2024-01-15 10:00:00|2024-01-15 10:30:00|
43|SENSOR_03|warehouse_B|2                 |18.65   |0.9000000000000021|2024-01-15 10:00:00|2024-01-15 10:30:00|
44|SENSOR_04|warehouse_C|1                 |15.2    |0.0               |2024-01-15 10:00:00|2024-01-15 10:30:00|
45+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
46
47
48################################################
49##### 2. Variables vs String Interpolation #####
50################################################
51
52UNSAFE: String interpolation (vulnerable to SQL injection)
53Result records: 10
54+---------+-------------------+-----------+--------+-----------+------+
55|sensor_id|cur_timestamp      |temperature|humidity|location   |status|
56+---------+-------------------+-----------+--------+-----------+------+
57|SENSOR_01|2024-01-15 10:00:00|22.5       |45.2    |warehouse_A|normal|
58|SENSOR_02|2024-01-15 10:05:00|35.8       |78.5    |warehouse_A|normal|
59|SENSOR_03|2024-01-15 10:10:00|18.2       |42.1    |warehouse_B|normal|
60|SENSOR_01|2024-01-15 10:15:00|23.1       |46.8    |warehouse_A|normal|
61|SENSOR_02|2024-01-15 10:20:00|38.5       |82.3    |warehouse_A|alert |
62|SENSOR_04|2024-01-15 10:25:00|15.2       |38.5    |warehouse_C|normal|
63|SENSOR_03|2024-01-15 10:30:00|19.1       |43.7    |warehouse_B|normal|
64|SENSOR_01|2024-01-15 10:35:00|24.2       |47.5    |warehouse_A|normal|
65|SENSOR_04|2024-01-15 10:40:00|12.8       |35.2    |warehouse_C|alert |
66|SENSOR_02|2024-01-15 10:45:00|32.1       |72.5    |warehouse_A|normal|
67+---------+-------------------+-----------+--------+-----------+------+
68
69SAFE: Variable substitution (injection-proof)
70Result records: 0
71+---------+-------------+-----------+--------+--------+------+
72|sensor_id|cur_timestamp|temperature|humidity|location|status|
73+---------+-------------+-----------+--------+--------+------+
74+---------+-------------+-----------+--------+--------+------+

Scripts SQL Multi-Instructions

Introduction

Les scripts SQL multi-instructions permettent d’exécuter plusieurs commandes SQL à partir d’un seul ensemble d’instructions (script) avec un flux de contrôle procédural. Cela permet de prendre en charge les déclarations de variables, la logique conditionnelle (IF/ELSE), les boucles (WHILE, FOR), la gestion des exceptions (blocs BEGIN/END) et le séquencement des instructions.

Détail

Avant Spark 4.0, l’exécution de plusieurs instructions SQL nécessitait soit :

  1. Orchestration au niveau applicatif : Chaque instruction est un appel API séparé. Aucune sémantique transactionnelle. La gestion des échecs nécessite des blocs try-catch dans le code applicatif.
1spark.sql("CREATE TABLE staging AS SELECT * FROM source")
2spark.sql("DELETE FROM staging WHERE invalid = true")
3spark.sql("INSERT INTO target SELECT * FROM staging")
  1. Outils de workflow externes : Ajoute de la complexité opérationnelle. Nécessite une infrastructure d’orchestration séparée. Une logique SQL simple devient une configuration Python/YAML.
  2. Invocations multiples de Spark SQL : Aucun partage d’état entre les invocations. Impossible de passer des variables. Mauvaises performances dues à l’initialisation répétée de la session.

Spark 4.0 introduit les scripts multi-instructions qui s’exécutent comme une seule unité. Les scripts contiennent plusieurs instructions SQL séparées par des points-virgules, avec un flux de contrôle procédural.

L’analyseur de script décompose l’entrée en instructions individuelles et construit un plan d’exécution. Chaque instruction s’exécute séquentiellement. Les variables déclarées dans les premières instructions sont disponibles pour les instructions ultérieures. Les instructions de flux de contrôle (IF/ELSE, WHILE) modifient l’ordre d’exécution en fonction des conditions d’exécution.

  • Les scripts s’exécutent sur le driver Spark.
  • Le flux de contrôle est évalué sur le driver, et non distribué aux exécuteurs.
  • Chaque instruction passe par l’optimisation et l’exécution Catalyst normales.
  • Le modèle d’exécution est séquentiel, non transactionnel : Si l’instruction 3 échoue, les instructions 1 et 2 ont déjà validé leurs modifications. Il n’y a pas de rollback automatique. Cela diffère des procédures stockées de bases de données traditionnelles qui fournissent souvent une sémantique transactionnelle.

Constructions linguistiques clés :

Avantages

  1. Dépendances externes réduites : Les pipelines ETL simples ne nécessitent plus des outils d’orchestration spécifiques. Les scripts SQL peuvent implémenter la logique de validation, transformation et chargement dans un seul fichier. Cela peut réduire la complexité opérationnelle et les coûts d’infrastructure.
  2. Logique procédurale en SQL : Les constructions IF/ELSE et WHILE permettent l’implémentation de la logique métier directement en SQL. Réduit les changements de contexte entre SQL et Python/Scala. Les équipes ayant de solides compétences SQL peuvent construire des pipelines complets sans code applicatif.
  3. Contrôle de version et portabilité : Les scripts sont des fichiers texte brut qui résident dans le contrôle de version. Les modifications sont suivies via Git. Les scripts sont portables entre les déploiements Spark (local, cluster, Connect). Plus facile à réviser et auditer que du code applicatif réparti sur plusieurs fichiers.
  4. Exécution côté serveur : Avec Spark Connect, le script entier est envoyé au serveur et s’exécute à distance. Réduit les allers-retours réseau par rapport à la soumission instruction par instruction. Le client ne reçoit que les résultats finaux, pas l’état intermédiaire.

Limitations

  1. Pas de rollback transactionnel : Les instructions échouées n’annulent pas automatiquement les instructions précédentes. Si le script crée une table de staging, charge des données, puis échoue lors de la validation, la table de staging reste. Un nettoyage manuel est requis. Ceci est fondamentalement différent des bases de données ACID.
  2. Capacités de correction des erreurs limitées : Pas de correction des erreurs interactive. Pas de points d’arrêt. Pas d’inspection de variables pendant l’exécution. La correction d’erreur nécessite d’ajouter des instructions SELECT pour afficher les valeurs intermédiaires. Les messages d’erreur peuvent ne pas indiquer clairement quelle instruction a échoué dans les scripts complexes.
  3. Seuil de complexité : La logique complexe est mieux implémentée dans le code applicatif avec des frameworks de tests appropriés. Les scripts doivent rester des pipelines de données linéaires, et non des applications complètes.
  4. Limitation dans la gestion des erreurs : Impossible de capturer des types d’erreur spécifiques avec la granularité du try-catch en Python/Scala. Impossible de continuer l’exécution après certaines erreurs. Le modèle d’échec de script est tout-ou-rien pour de nombreux types d’erreur.

Cas d’usages (concret)

  • Cas d’usage 1 : Chargement Incrémental
    • Une boucle qui traite 24 partitions horaires une par une.
    • Un script qui ajoute de nouvelles partitions pour les 30 prochains jours, en vérifiant si elles existent déjà.
  • Cas d’usage 2 : Déploiement Spécifique à l’Environnement
    • Les scripts de déploiement doivent créer différentes structures de tables en dev et en prod. Le script utilise IF/ELSE basé sur une variable d’environnement pour exécuter les instructions CREATE TABLE appropriées. Un seul script remplace des fichiers SQL séparés pour dev et prod.

Codes

Exemple : Exécution d’un script SQL multi-instructions avec Spark Connect

Spark

Contenu du script Spark Connect :

 1from pyspark.sql import SparkSession
 2
 3REMOTE_URL = "sc://localhost:15002"
 4SCRIPT_SQL_NAME = "example_SQL.sql"
 5
 6# Connexion to Spark Connect
 7spark = SparkSession.builder \
 8    .remote(REMOTE_URL) \
 9    .appName("SPARK_SQLScript") \
10    .config("spark.sql.ansi.enabled", "true") \
11    .getOrCreate()
12
13print("Successfully connected to Spark!")
14print(f"Spark version: {spark.version}")
15
16# Script reading
17with open(SCRIPT_SQL_NAME, "r", encoding="utf-8") as f:
18    sql_script = f.read()
19
20# Script SQL execution with only one spark call
21result = spark.sql(sql_script)
22result.show()

Contenu du script SQL example_SQL.sql :

  1-- Spark 4.x - SQL Scripting
  2
  3BEGIN
  4    -- =============================================
  5    -- STEP n°1 : VARIABLES
  6    -- =============================================
  7    DECLARE batch_date DATE DEFAULT CURRENT_DATE();
  8
  9    -- Business thresholds
 10    DECLARE min_valid_amount DECIMAL(10,2) DEFAULT 10.00;
 11    DECLARE standard_threshold DECIMAL(10,2) DEFAULT 100.00;
 12    DECLARE premium_threshold DECIMAL(10,2) DEFAULT 1000.00;
 13
 14    -- Metrics
 15    DECLARE total_processed INT DEFAULT 0;
 16    DECLARE total_invalid INT DEFAULT 0;
 17    DECLARE total_revenue DECIMAL(15,2) DEFAULT 0.0;
 18
 19    -- =============================================
 20    -- STEP n°2 : TABLES INITIALIZATION
 21    -- =============================================
 22
 23    -- Staging table for raw data
 24    DROP TABLE IF EXISTS raw_orders;
 25    CREATE TABLE raw_orders (
 26        order_id BIGINT,
 27        customer_id BIGINT,
 28        order_date DATE,
 29        amount DECIMAL(10,2),
 30        status STRING,
 31        region STRING
 32    ) USING parquet;
 33
 34    -- Table of validated orders
 35    DROP TABLE IF EXISTS validated_orders;
 36    CREATE TABLE validated_orders (
 37        order_id BIGINT,
 38        customer_id BIGINT,
 39        order_date DATE,
 40        amount DECIMAL(10,2),
 41        region STRING,
 42        order_category STRING,
 43        loyalty_points DECIMAL(10,2),
 44        processing_date DATE
 45    ) USING parquet;
 46
 47    -- Quarantine table for invalid data
 48    DROP TABLE IF EXISTS invalid_orders;
 49    CREATE TABLE invalid_orders (
 50        order_id BIGINT,
 51        customer_id BIGINT,
 52        amount DECIMAL(10,2),
 53        error_reason STRING,
 54        quarantine_timestamp TIMESTAMP
 55    ) USING parquet;
 56
 57    -- =============================================
 58    -- STEP n°3 : DATASETS
 59    -- =============================================
 60
 61    INSERT INTO raw_orders VALUES
 62        (1001, 501, DATE'2024-01-15', 150.50, 'completed', 'EMEA'),
 63        (1002, 502, DATE'2024-01-15', 1250.00, 'completed', 'AMER'),
 64        (1003, 503, DATE'2024-01-15', -50.00, 'completed', 'APAC'), -- Invalid: negative
 65        (1004, 504, DATE'2024-01-15', 890.00, 'completed', 'EMEA'),
 66        (2001, 505, DATE'2024-01-16', 2100.00, 'completed', 'AMER'),
 67        (2002, 506, DATE'2024-01-16', NULL, 'completed', 'APAC'), -- Invalid: NULL amount
 68        (2003, 507, DATE'2024-01-16', 450.00, 'completed', 'EMEA'),
 69        (2004, 508, DATE'2024-01-16', 5.00, 'completed', 'AMER'), -- Invalid: below minimum
 70        (3001, 509, DATE'2024-01-17', 750.00, 'completed', 'APAC'),
 71        (3002, 510, DATE'2024-01-17', 1500.00, 'completed', 'EMEA'),
 72        (3003, 511, DATE'2024-01-17', 220.00, 'completed', 'AMER');
 73
 74    -- =============================================
 75    -- STEP n°4 : DATA VALIDATION
 76    -- =============================================
 77
 78    -- Quarantining invalid records
 79    INSERT INTO invalid_orders
 80    SELECT 
 81        order_id,
 82        customer_id,
 83        amount,
 84        CASE 
 85            WHEN amount IS NULL THEN 'NULL_AMOUNT'
 86            WHEN amount < 0 THEN 'NEGATIVE_AMOUNT'
 87            WHEN amount < min_valid_amount THEN 'BELOW_MINIMUM'
 88            ELSE 'UNKNOWN_ERROR'
 89        END AS error_reason,
 90        CURRENT_TIMESTAMP() AS quarantine_timestamp
 91    FROM raw_orders
 92    WHERE status = 'completed'
 93        AND (amount IS NULL 
 94            OR amount < 0 
 95            OR amount < min_valid_amount);
 96
 97    -- Counting invalid rows
 98    SET total_invalid = (SELECT COUNT(*) FROM invalid_orders);
 99            
100    -- =============================================
101    -- STEP n°5 : VALID DATA TRANSFORMATION
102    -- =============================================
103
104    INSERT INTO validated_orders
105    SELECT 
106        order_id,
107        customer_id,
108        order_date,
109        amount,
110        region,
111        -- CASE WHEN classification
112        CASE 
113            WHEN amount >= premium_threshold THEN 'PREMIUM'
114            WHEN amount >= standard_threshold THEN 'STANDARD'
115            ELSE 'BASIC'
116        END AS order_category,
117        -- Calculation of loyalty points
118        CASE 
119            WHEN amount >= premium_threshold THEN amount * 0.10
120            WHEN amount >= standard_threshold THEN amount * 0.05
121            ELSE amount * 0.02
122        END AS loyalty_points,
123        batch_date AS processing_date
124    FROM raw_orders
125    WHERE status = 'completed'
126        AND amount IS NOT NULL
127        AND amount >= min_valid_amount;
128        
129    -- Counting total rows processed
130    SET total_processed = (SELECT COUNT(*) FROM validated_orders WHERE processing_date = batch_date);
131            
132    -- Revenue calculation
133    SET total_revenue = (SELECT COALESCE(SUM(amount), 0) FROM validated_orders WHERE processing_date = batch_date);
134
135
136    -- =============================================
137    -- STEP n°6 : GENERATION OF FINAL REPORTS
138    -- =============================================
139    SELECT '========== EXECUTION - SUMMARY ==========' AS message;
140
141    -- Execution report
142    SELECT 'Batch Date' as `Metric`, CAST(batch_date AS STRING) as `Value`
143    UNION ALL SELECT 'Total Valid Orders', CAST(total_processed AS STRING)
144    UNION ALL SELECT 'Total Invalid Orders', CAST(total_invalid AS STRING)
145    UNION ALL SELECT 'Total Revenue', CAST(total_revenue AS STRING)
146    UNION ALL SELECT 'Invalid Ratio %', CAST(ROUND((total_invalid * 100.0) / GREATEST(total_processed + total_invalid, 1), 2) AS STRING);
147
148    -- =============================================
149    -- STEP n°7 : DROP TABLES
150    -- =============================================
151    DROP TABLE IF EXISTS raw_orders;
152    DROP TABLE IF EXISTS validated_orders;
153    DROP TABLE IF EXISTS invalid_orders;
154
155END;

Résultat

 1Successfully connected to Spark!
 2Spark version: 4.1.1
 3+--------------------+----------+
 4|              Metric|     Value|
 5+--------------------+----------+
 6|          Batch Date|2026-03-10|
 7|  Total Valid Orders|         8|
 8|Total Invalid Orders|         3|
 9|       Total Revenue|   7310.50|
10|     Invalid Ratio %|     27.27|
11+--------------------+----------+